diff --git a/src/org/jgroups/demos/RelayDemo.java b/src/org/jgroups/demos/RelayDemo.java index 01d600555a..7cdaa0397c 100644 --- a/src/org/jgroups/demos/RelayDemo.java +++ b/src/org/jgroups/demos/RelayDemo.java @@ -1,15 +1,16 @@ package org.jgroups.demos; import org.jgroups.*; -import org.jgroups.protocols.relay.RELAY; -import org.jgroups.protocols.relay.RELAY3; -import org.jgroups.protocols.relay.RouteStatusListener; +import org.jgroups.protocols.relay.*; import org.jgroups.protocols.relay.Topology; import org.jgroups.util.ExtendedUUID; import org.jgroups.util.UUID; import org.jgroups.util.Util; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; /** Demos RELAY. Create 2 *separate* clusters with RELAY as top protocol. Each RELAY has bridge_props="tcp.xml" (tcp.xml * needs to be present). Then start 2 instances in the first cluster and 2 instances in the second cluster. They should @@ -17,7 +18,7 @@ * @author Bela Ban */ public class RelayDemo implements Receiver { - protected static final String SITE_MASTERS="site-masters"; + protected static final String SITE_MASTERS="site-masters", SENDTO="sendto"; protected JChannel ch; protected RELAY relay; @@ -84,24 +85,8 @@ protected void start(String props, String name, boolean print_route_status, if(relay == null) throw new IllegalStateException(String.format("Protocol %s not found", RELAY.class.getSimpleName())); String site_name=relay.site(); - if(print_route_status) { - relay.setRouteStatusListener(new RouteStatusListener() { - public void sitesUp(String... sites) { - System.out.printf("-- %s: site(s) %s came up\n", - ch.getAddress(), String.join(", ", sites)); - } - - public void sitesDown(String... sites) { - System.out.printf("-- %s: site(s) %s went down\n", - ch.getAddress(), String.join(", ", sites)); - } - - public void sitesUnreachable(String... sites) { - System.out.printf("-- %s: site(s) %s are unreachable\n", - ch.getAddress(), String.join(", ", sites)); - } - }); - } + if(print_route_status) + relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> relay.addr()).verbose(true)); if(use_view_handler) relay.topo().setViewHandler((s, v) -> { if(!s.equals(relay.getSite())) @@ -162,9 +147,48 @@ protected boolean process(String line) { System.out.printf("%s\n", relay.printRoutes()); return true; } + if(line.startsWith(SENDTO)) { + sendTo(line); + return true; + } return false; } + // sendto dest msg times sleep + protected void sendTo(String line) { + String[] list=line.split("\\s"); // whitespace, ie. " " + long sleep=Long.parseLong(list[list.length-1]); + int times=Integer.parseInt(list[list.length-2]); + String dest=list[1]; + List tmp=new ArrayList<>(Arrays.asList(list).subList(2, list.length - 2)); + String msg=String.join(" ", tmp); + String[] s=dest.split(":"); + String member_name=s[0], site_name=s[1]; + Map cache=relay.topo().cache(); + View v=cache.get(site_name); + if(v == null) + throw new IllegalArgumentException(String.format("site %s not found in cache", site_name)); + Address target=null; + for(Address addr: v) { + SiteUUID sa=(SiteUUID)addr; + if(member_name.equals(sa.getName())) { + target=sa; + break; + } + } + if(target == null) + throw new IllegalArgumentException(String.format("member %s not found in cache", member_name)); + try { + for(int i=1; i <= times; i++) { + ch.send(target, String.format("%s [#%d]", msg, i)); + Util.sleep(sleep); + } + } + catch(Exception ex) { + System.err.printf("failed sending msg to %s: %s\n", target, ex); + } + } + protected String printTopo(String line, String command, boolean refresh) { String sub=line.substring(command.length()).trim(); String site=null; @@ -188,7 +212,8 @@ protected static void help() { "\nsites: prints the configured sites" + "\ntopo: prints the topology (site masters and local members of all sites)" + "\npt: prints the cache (no refresh)" + - "\nroutes: prints all routes (if site master)\n"); + "\nroutes: prints all routes (if site master)" + + "\nsendto msg \n"); } diff --git a/src/org/jgroups/protocols/relay/RELAY.java b/src/org/jgroups/protocols/relay/RELAY.java index 2db9bc8f7b..787f1e6779 100644 --- a/src/org/jgroups/protocols/relay/RELAY.java +++ b/src/org/jgroups/protocols/relay/RELAY.java @@ -434,4 +434,9 @@ protected void triggerSiteUnreachableEvent(SiteAddress remoteSite) { route_status_listener.sitesUnreachable(remoteSite.getSite()); } + protected void triggerMemberUnreachableEvent(Address mbr) { + if(route_status_listener != null) + route_status_listener.memberUnreachable(mbr); + } + } diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index 0d7c3d5499..919aa40f04 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -371,7 +371,8 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) { Route route=tmp.getRoute(target_site, sender); if(route == null) { if(suppress_log_no_route != null) - suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors, sender, target_site); + suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors, + local_addr, sender, dest); else log.error(Util.getMessage("RelayNoRouteToSite"), local_addr, target_site); sendSiteUnreachableTo(msg.getSrc(), target_site); diff --git a/src/org/jgroups/protocols/relay/RELAY3.java b/src/org/jgroups/protocols/relay/RELAY3.java index 2cf9f309b1..b16d271ce5 100644 --- a/src/org/jgroups/protocols/relay/RELAY3.java +++ b/src/org/jgroups/protocols/relay/RELAY3.java @@ -79,10 +79,20 @@ public Object up(Message msg) { Message copy=msg; RelayHeader hdr=msg.getHeader(id); if(hdr != null) { - if(hdr.getType() == SITE_UNREACHABLE) { - triggerSiteUnreachableEvent((SiteAddress)hdr.final_dest); - return null; + switch(hdr.type) { + case SITE_UNREACHABLE: + Set tmp_sites=hdr.getSites(); + if(tmp_sites != null) { + for(String s: tmp_sites) + triggerSiteUnreachableEvent(new SiteMaster(s)); + } + return null; + case MBR_UNREACHABLE: + Address unreachable_mbr=msg.getObject(); + triggerMemberUnreachableEvent(unreachable_mbr); + return null; } + //todo: check if copy is needed! copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr); } @@ -90,7 +100,7 @@ public Object up(Message msg) { } public void up(MessageBatch batch) { - List unreachable_sites=null; + Set unreachable_sites=null; for(Iterator it=batch.iterator(); it.hasNext();) { Message msg=it.next(), copy=msg; // if(msg.isFlagSet(Flag.NO_RELAY)) @@ -98,23 +108,27 @@ public void up(MessageBatch batch) { RelayHeader hdr=msg.getHeader(id); it.remove(); if(hdr != null) { - if(hdr.getType() == SITE_UNREACHABLE) { - SiteAddress site_addr=(SiteAddress)hdr.final_dest; - String site_name=site_addr.getSite(); - if(unreachable_sites == null) - unreachable_sites=new ArrayList<>(); - boolean contains=unreachable_sites.stream().anyMatch(sa -> sa.getSite().equals(site_name)); - if(!contains) - unreachable_sites.add(site_addr); - continue; + switch(hdr.type) { + case SITE_UNREACHABLE: + Set tmp_sites=hdr.getSites(); + if(tmp_sites != null) { + if(unreachable_sites == null) + unreachable_sites=new HashSet<>(); + unreachable_sites.addAll(tmp_sites); + } + continue; + case MBR_UNREACHABLE: + Address unreachable_mbr=msg.getObject(); + triggerMemberUnreachableEvent(unreachable_mbr); + continue; } copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr); } process(false, copy); } if(unreachable_sites != null) { - for(SiteAddress sa: unreachable_sites) - triggerSiteUnreachableEvent(sa); // https://issues.redhat.com/browse/JGRP-2586 + for(String s: unreachable_sites) + triggerSiteUnreachableEvent(new SiteMaster(s)); // https://issues.redhat.com/browse/JGRP-2586 } if(!batch.isEmpty()) up_prot.up(batch); @@ -410,7 +424,8 @@ protected Object route(Message msg, Collection sites) { route=r.getForwardingRouteMatching(s, sender); } if(route == null) { - suppress_log_no_route.log(SuppressLog.Level.error, s, suppress_time_no_route_errors, sender, s); + suppress_log_no_route.log(SuppressLog.Level.error, s, suppress_time_no_route_errors, + local_addr, sender, dest); sendSiteUnreachableTo(msg.getSrc(), s); } else @@ -433,6 +448,10 @@ protected Object deliver(Address next_dest, Message msg, boolean dont_relay) { protected Object deliver(Address next_dest, Message msg, boolean dont_relay, boolean dont_loopback) { checkLocalAddress(next_dest); Address final_dest=msg.dest(), original_sender=msg.src(); + if(final_dest != null && view != null && !view.containsMember(final_dest)) { + sendMemberUnreachableTo(original_sender, final_dest); + return null; + } RelayHeader tmp=msg.getHeader(this.id); // todo: check if copy is needed here RelayHeader hdr=tmp != null? tmp.copy().setOriginalSender(original_sender).setFinalDestination(final_dest) @@ -493,24 +512,40 @@ protected boolean sameSite(SiteAddress addr) { } /** - * Sends a SITE-UNREACHABLE message to the sender of the message. Because the sender is always local (we're the - * relayer), no routing needs to be done - * @param src The node who is trying to send a message to the {@code target_site} + * Sends a SITE-UNREACHABLE message to the sender of the message. + * @param dest The node who is trying to send a message to the {@code target_site} * @param target_site The remote site's name. */ - protected void sendSiteUnreachableTo(Address src, String target_site) { - if (src == null || src.equals(local_addr)) { + protected void sendSiteUnreachableTo(Address dest, String target_site) { + if (dest == null || dest.equals(local_addr)) { //short circuit - // if src == null, it means the message comes from the top protocol (i.e. the local node) + // if dest == null, it means the message comes from the top protocol (i.e. the local node) triggerSiteUnreachableEvent(new SiteMaster(target_site)); return; } - // send message back to the src node. - Message msg=new EmptyMessage(src).setFlag(Flag.OOB) + // send message back to the dest node. + Message msg=new EmptyMessage(dest).setFlag(Flag.OOB) .putHeader(id, new RelayHeader(SITE_UNREACHABLE).addToSites(target_site)); down(msg); } + /** + * Sends a MBR-UNREACHABLE message to the sender of a unicast message + * @param dest The node who original sent the unicast message. Must never be null + */ + protected void sendMemberUnreachableTo(Address dest, Address member) { + if (dest.equals(local_addr)) { + //short circuit + // if dest == null, it means the message comes from the top protocol (i.e. the local node) + triggerMemberUnreachableEvent(member); + return; + } + // send message back to the original sender + Message msg=new ObjectMessage(dest, member).setFlag(Flag.OOB) + .putHeader(id, new RelayHeader(MBR_UNREACHABLE)); + down(msg); + } + protected CompletableFuture startRelayer(Relayer3 rel, String bridge_name) { try { diff --git a/src/org/jgroups/protocols/relay/RelayHeader.java b/src/org/jgroups/protocols/relay/RelayHeader.java index 8d8f92b3ed..9b8d392af3 100644 --- a/src/org/jgroups/protocols/relay/RelayHeader.java +++ b/src/org/jgroups/protocols/relay/RelayHeader.java @@ -21,6 +21,7 @@ public class RelayHeader extends Header { public static final byte DATA = 1; public static final byte SITE_UNREACHABLE = 2; // final_dest is a SiteMaster + public static final byte MBR_UNREACHABLE = 3; // sent back when a given member is not part of the local cluster public static final byte SITES_UP = 4; public static final byte SITES_DOWN = 5; public static final byte TOPO_REQ = 6; @@ -184,6 +185,7 @@ protected static String typeToString(byte type) { switch(type) { case DATA: return "DATA"; case SITE_UNREACHABLE: return "SITE_UNREACHABLE"; + case MBR_UNREACHABLE: return "MBR_UNREACHABLE"; case SITES_UP: return "SITES_UP"; case SITES_DOWN: return "SITES_DOWN"; case TOPO_REQ: return "TOPO_REQ"; diff --git a/tests/junit-functional/org/jgroups/tests/RelayTest.java b/tests/junit-functional/org/jgroups/tests/RelayTest.java index 76d98c7873..1d381aabc4 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTest.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTest.java @@ -43,7 +43,7 @@ public class RelayTest extends RelayTests { @DataProvider protected Object[][] relayProvider() { return new Object[][] { - // {RELAY2.class}, + {RELAY2.class}, {RELAY3.class} }; } @@ -355,14 +355,14 @@ public void testSitesUp(Class cl) throws Exception { Stream.of(d,e,f,_g,_h,_i) .map(ch -> (RELAY)ch.getProtocolStack().findProtocol(RELAY.class)) - .forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(false))); + .forEach(r -> r.setRouteStatusListener(new DefaultRouteStatusListener(() -> r.getAddress()).verbose(false))); // now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC) Util.close(a); Util.waitUntil(5000, 500, () -> Stream.of(d,e,f,_g,_h,_i) .map(ch -> (RELAY)ch.getProtocolStack().findProtocol(RELAY.class)) .peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener())) - .map(r -> (MyRouteStatusListener)r.getRouteStatusListener()) + .map(r -> (DefaultRouteStatusListener)r.getRouteStatusListener()) .allMatch(l -> l.down().contains(LON) && l.up().contains(LON))); } } @@ -768,7 +768,12 @@ public void testFailover(Class cl) throws Exception { && m.dest() != null && m.dest().isSiteMaster()); c.getProtocolStack().insertProtocol(drop, ProtocolStack.Position.BELOW, UNICAST3.class); - a.send(new SiteMaster("nyc"), "hello"); + UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class); + RELAY relay=a.getProtocolStack().findProtocol(RELAY.class); + relay.delaySitesDown(true); + relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> a.address()).verbose(true)); + Address target=new SiteMaster(NYC); + a.send(target, "hello"); // now kill C; D will take over as site master System.out.println("-- closing site master of nyc (C):"); @@ -777,6 +782,11 @@ public void testFailover(Class cl) throws Exception { () -> ((RELAY)d.getProtocolStack().findProtocol(RELAY.class)).isSiteMaster()); MyReceiver r=getReceiver(d); Util.waitUntil(2000, 200, () -> r.size() == 1); + + Table send_win=unicast.getSendWindow(target); + // wait until the ack from D has been received + Util.waitUntil(2000, 100, () -> send_win.getHighestDelivered() == send_win.getHighestReceived()); + relay.setRouteStatusListener(null); } /** Same as above, but now DROP is added to the bridge stack between "lon" and "nyc" */ @@ -847,23 +857,7 @@ protected void _testFailoverSiteDown(Class cl, Supplier
_C:nyc (site master) RELAY relay=a.getProtocolStack().findProtocol(RELAY.class); - - relay.setRouteStatusListener(new RouteStatusListener() { - @Override - public void sitesUp(String... sites) { - System.out.printf("** site-up(%s)\n", Arrays.toString(sites)); - } - - @Override - public void sitesDown(String... sites) { - System.out.printf("** site-down(%s)\n", Arrays.toString(sites)); - } - - @Override - public void sitesUnreachable(String... sites) { - System.out.printf("** sites-unreachable(%s)\n", Arrays.toString(sites)); - } - }); + relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> relay.addr()).verbose(true)); JChannel bridge=relay.getBridge(NYC); bridge.getProtocolStack().insertProtocol(drop, ProtocolStack.Position.BELOW, UNICAST3.class); @@ -879,6 +873,7 @@ public void sitesUnreachable(String... sites) { drop.clearDownFilters(); UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class); Table send_win=unicast.getSendWindow(target); + relay.setRouteStatusListener(null); // check if there is still retransmission going on in A Util.waitUntil(5000, 1000, () -> { long highest_acked=send_win.getHighestDelivered(); // highest delivered == highest ack (sender win) diff --git a/tests/junit-functional/org/jgroups/tests/RelayTestAsym.java b/tests/junit-functional/org/jgroups/tests/RelayTestAsym.java index 605dce5c4d..d7f05667ea 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTestAsym.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTestAsym.java @@ -2,9 +2,9 @@ import org.jgroups.*; import org.jgroups.protocols.UNICAST3; +import org.jgroups.protocols.relay.DefaultRouteStatusListener; import org.jgroups.protocols.relay.RELAY; import org.jgroups.protocols.relay.RELAY3; -import org.jgroups.protocols.relay.RouteStatusListener; import org.jgroups.protocols.relay.SiteMaster; import org.jgroups.util.MyReceiver; import org.jgroups.util.Table; @@ -113,36 +113,43 @@ protected void _testFailoverSiteDown(Supplier
s) throws Exception { y.setReceiver(new MyReceiver().verbose(true).name(y.name())); Util.closeReverse(m, n, o); // causes entire net2 site to be down - relay.setRouteStatusListener(new RouteStatusListener() { - @Override - public void sitesUp(String... sites) { - System.out.printf("** site-up(%s)\n", Arrays.toString(sites)); - } - - @Override - public void sitesDown(String... sites) { - System.out.printf("** site-down(%s)\n", Arrays.toString(sites)); - } - - @Override - public void sitesUnreachable(String... sites) { - System.out.printf("** sites-unreachable(%s)\n", Arrays.toString(sites)); - } - }); + relay.setRouteStatusListener(new DefaultRouteStatusListener(() -> a.address()).verbose(true)); a.send(target, "hello"); // won't succeed - MyReceiver r=getReceiver(y); - Util.waitUntil(2000, 200, () -> r.size() == 1); + Util.waitUntilTrue(2000, 200, () -> r.size() == 1); + + UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class); + Table send_win=unicast.getSendWindow(target); + // check if there is still retransmission going on in A + Util.waitUntil(5000, 1000, () -> { + long highest_acked=send_win.getHighestDelivered(); // highest delivered == highest ack (sender win) + long highest_sent=send_win.getHighestReceived(); // we use table as a *sender* win, so it's highest *sent*... + System.out.printf("** A -> %s: highest_sent: %d highest_acked: %d\n", target, highest_sent, highest_acked); + return highest_acked >= highest_sent; + }); + } + + /** A:lon sends a unicast message M to Y:net3, which left the cluster before. X:net3 (the site master) should + * send back a HOST-UNREACHABLE message to A:lon */ + public void testHostUnreachable() throws Exception { + setup(true); + waitForSiteMasters(true, a, d, m, x); + RELAY relay=a.getProtocolStack().findProtocol(RELAY.class); + Address target=y.address(); + relay.setRouteStatusListener(new DefaultRouteStatusListener(relay::addr).verbose(true)); + Util.close(y); + Util.waitUntil(5000, 200, () -> Stream.of(x,z).allMatch(ch -> ch.getView().size() == 2)); + System.out.printf("-- sending message to (crashed) %s:\n", target); + a.send(target, "hello"); - Util.closeReverse(m, n, o); UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class); Table send_win=unicast.getSendWindow(target); // check if there is still retransmission going on in A Util.waitUntil(5000, 1000, () -> { long highest_acked=send_win.getHighestDelivered(); // highest delivered == highest ack (sender win) long highest_sent=send_win.getHighestReceived(); // we use table as a *sender* win, so it's highest *sent*... - System.out.printf("** highest_sent: %d highest_acked: %d\n", highest_sent, highest_acked); + System.out.printf("** A -> %s: highest_sent: %d highest_acked: %d\n", target, highest_sent, highest_acked); return highest_acked >= highest_sent; }); } diff --git a/tests/junit-functional/org/jgroups/tests/RelayTests.java b/tests/junit-functional/org/jgroups/tests/RelayTests.java index 0e410b35b3..8f1ae91ea0 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTests.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTests.java @@ -233,40 +233,6 @@ protected static String msgs(List channels) { .collect(Collectors.joining("\n")); } - protected static class MyRouteStatusListener implements RouteStatusListener { - protected final Address local_addr; - protected final List up=new ArrayList<>(), down=new ArrayList<>(); - protected boolean verbose; - - protected MyRouteStatusListener(Address local_addr) { - this.local_addr=local_addr; - } - - protected List up() {return up;} - protected List down() {return down;} - protected MyRouteStatusListener verbose(boolean b) {this.verbose=b; return this;} - protected boolean verbose() {return verbose;} - - @Override public synchronized void sitesUp(String... sites) { - if(verbose) - System.out.printf("%s: UP(%s)\n", local_addr, Arrays.toString(sites)); - up.addAll(Arrays.asList(sites)); - } - - @Override public synchronized void sitesDown(String... sites) { - if(verbose) - System.out.printf("%s: DOWN(%s)\n", local_addr, Arrays.toString(sites)); - down.addAll(Arrays.asList(sites)); - } - - protected synchronized MyRouteStatusListener clear() {up.clear(); down.clear(); return this;} - - @Override - public String toString() { - return String.format("down: %s, up: %s", down, up); - } - } - protected static class ResponseSender extends MyReceiver { protected final JChannel ch;