From 4239dc7bc0ce0662f9ee0604d2135c4452c23994 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Wed, 27 Nov 2024 15:19:06 +0100 Subject: [PATCH] Unblock sender on view change and stop() (https://issues.redhat.com/browse/JGRP-2854) --- src/org/jgroups/protocols/NAKACK4.java | 4 +- .../jgroups/protocols/ReliableMulticast.java | 14 +- .../jgroups/protocols/ReliableUnicast.java | 64 +++--- src/org/jgroups/protocols/UNICAST4.java | 2 - src/org/jgroups/util/Buffer.java | 22 +- src/org/jgroups/util/DynamicBuffer.java | 7 +- src/org/jgroups/util/FixedBuffer.java | 28 +-- src/org/jgroups/util/Util.java | 10 +- .../org/jgroups/tests/BufferTest.java | 57 +++-- .../tests/ReliableUnicastBlockTest.java | 195 ++++++++++++++++++ .../jgroups/tests/RingBufferStressTest.java | 2 +- 11 files changed, 315 insertions(+), 90 deletions(-) create mode 100644 tests/junit-functional/org/jgroups/tests/ReliableUnicastBlockTest.java diff --git a/src/org/jgroups/protocols/NAKACK4.java b/src/org/jgroups/protocols/NAKACK4.java index e054aefd73..aac0a1c1a8 100644 --- a/src/org/jgroups/protocols/NAKACK4.java +++ b/src/org/jgroups/protocols/NAKACK4.java @@ -191,7 +191,7 @@ protected void sendAck(Address to, Buffer win) { } @Override - protected void addToSendWindow(Buffer win, long seq, Message msg, Predicate filter) { - win.add(seq, msg, filter, sendOptions()); + protected boolean addToSendBuffer(Buffer win, long seq, Message msg, Predicate filter, boolean dont_block) { + return win.add(seq, msg, filter, sendOptions(), dont_block); } } diff --git a/src/org/jgroups/protocols/ReliableMulticast.java b/src/org/jgroups/protocols/ReliableMulticast.java index 3b17b9a54a..5d6145f93b 100644 --- a/src/org/jgroups/protocols/ReliableMulticast.java +++ b/src/org/jgroups/protocols/ReliableMulticast.java @@ -718,8 +718,8 @@ protected void send(Message msg, Buffer win, boolean dont_loopback_set) lock.lock(); } try { - addToSendWindow(win, msg_id, msg, dont_loopback_set? remove_filter : null); - down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted + if(addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null, msg.isFlagSet(DONT_BLOCK))) + down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted } finally { if(lock != null) @@ -727,14 +727,13 @@ protected void send(Message msg, Buffer win, boolean dont_loopback_set) } } - /** - * Adds the message to the send window. The loop tries to handle temporary OOMEs by retrying if add() failed. - */ - protected void addToSendWindow(Buffer win, long seq, Message msg, Predicate filter) { + /** Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed */ + protected boolean addToSendBuffer(Buffer win, long seq, Message msg, Predicate filter, boolean dont_block) { long sleep=10; + boolean rc=false; do { try { - win.add(seq, msg, filter, sendOptions()); + rc=win.add(seq, msg, filter, sendOptions(), dont_block); break; } catch(Throwable t) { @@ -745,6 +744,7 @@ protected void addToSendWindow(Buffer win, long seq, Message msg, Predi } } while(running); + return rc; } protected void resend(Message msg) { // needed for byteman ProtPerf script - don't remove! diff --git a/src/org/jgroups/protocols/ReliableUnicast.java b/src/org/jgroups/protocols/ReliableUnicast.java index 704ebbb219..363af30036 100644 --- a/src/org/jgroups/protocols/ReliableUnicast.java +++ b/src/org/jgroups/protocols/ReliableUnicast.java @@ -689,9 +689,10 @@ public Object down(Message msg) { } SenderEntry entry=getSenderEntry(dst); - boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr); - send(msg, entry, dont_loopback_set); - num_msgs_sent.increment(); + boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr), + dont_block=msg.isFlagSet(DONT_BLOCK); + if(send(msg, entry, dont_loopback_set, dont_block)) + num_msgs_sent.increment(); return null; // the message was already sent down the stack in send() } @@ -745,17 +746,6 @@ public void removeSendConnection(Address mbr) { } } - public void removeSendConnection(Predicate
pred) { - for(Iterator> it=send_table.entrySet().iterator(); it.hasNext();) { - Map.Entry e=it.next(); - Address addr=e.getKey(); - if(pred.test(addr)) { - e.getValue().state(State.CLOSED); - it.remove(); - } - } - } - public void removeReceiveConnection(Address mbr) { sendPendingAcks(); ReceiverEntry entry=recv_table.remove(mbr); @@ -768,6 +758,8 @@ public void removeReceiveConnection(Address mbr) { */ @ManagedOperation(description="Trashes all connections to other nodes. This is only used for testing") public void removeAllConnections() { + for(SenderEntry se: send_table.values()) + se.state(State.CLOSED); send_table.clear(); recv_table.clear(); } @@ -1014,8 +1006,6 @@ protected SenderEntry getSenderEntry(Address dst) { if(cache != null && !members.contains(dst)) cache.add(dst); } - if(entry.state() == State.CLOSING) - entry.state(State.OPEN); return entry; } @@ -1092,7 +1082,7 @@ protected void handleXmitRequest(Address sender, SeqnoList missing) { } } - protected void send(Message msg, SenderEntry entry, boolean dont_loopback_set) { + protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set, boolean dont_block) { Buffer buf=entry.buf; long seqno=entry.seqno.getAndIncrement(); short send_conn_id=entry.connId(); @@ -1107,8 +1097,12 @@ protected void send(Message msg, SenderEntry entry, boolean dont_loopback_set) { lock.lock(); } try { - addToSendWindow(buf, seqno, msg, dont_loopback_set? remove_filter : null); + boolean added=addToSendBuffer(buf, seqno, msg, dont_loopback_set? remove_filter : null, dont_block); + if(!added) // e.g. message already present in send buffer, or no space and dont_block set + return false; down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted + if(entry.state() == State.CLOSING) + entry.state(State.OPEN); if(conn_expiry_timeout > 0) entry.update(); if(dont_loopback_set) @@ -1126,16 +1120,21 @@ protected void send(Message msg, SenderEntry entry, boolean dont_loopback_set) { sb.append(')'); log.trace(sb); } + return true; } /** - * Adds the message to the send window. The loop tries to handle temporary OOMEs by retrying if add() failed. + * Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed. + * @return True if added successfully. False if not, e.g. no space in buffer and DONT_BLOCK set, or message + * already present, or seqno lower than buffer.low */ - protected void addToSendWindow(Buffer win, long seq, Message msg, Predicate filter) { + protected boolean addToSendBuffer(Buffer win, long seq, Message msg, + Predicate filter, boolean dont_block) { long sleep=10; + boolean rc=false; do { try { - win.add(seq, msg, filter, sendOptions()); + rc=win.add(seq, msg, filter, sendOptions(), dont_block); break; } catch(Throwable t) { @@ -1146,6 +1145,7 @@ protected void addToSendWindow(Buffer win, long seq, Message msg, Predi } } while(running); + return rc; } @@ -1504,11 +1504,29 @@ protected Entry(short conn_id, Buffer buf) { public Buffer buf() {return buf;} public short connId() {return conn_id;} protected void update() {timestamp.set(getTimestamp());} - protected State state() {return state;} - protected Entry state(State s) {if(this.state != s) {this.state=s; update();} return this;} protected long age() {return MILLISECONDS.convert(getTimestamp() - timestamp.longValue(), NANOSECONDS);} protected boolean needToSendAck() {return send_ack.compareAndSet(true, false);} protected Entry sendAck() {send_ack.compareAndSet(false, true); return this;} + protected State state() {return state;} + + protected Entry state(State s) { + if(state != s) { + switch(state) { + case OPEN: + if(s == State.CLOSED) + buf.open(false); // unblocks blocked senders + break; + case CLOSING: + buf.open(s != State.CLOSED); + break; + case CLOSED: + break; + } + state=s; + update(); + } + return this; + } /** Returns true if a real ACK should be sent. This is based on num_acks_sent being > ack_threshold */ public boolean update(int num_acks, final IntBinaryOperator op) { diff --git a/src/org/jgroups/protocols/UNICAST4.java b/src/org/jgroups/protocols/UNICAST4.java index 4f078d1d4f..15313edc63 100644 --- a/src/org/jgroups/protocols/UNICAST4.java +++ b/src/org/jgroups/protocols/UNICAST4.java @@ -102,6 +102,4 @@ protected boolean needToSendAck(Entry e, int num_acks) { return e.update(num_acks, add_acks); } - - } diff --git a/src/org/jgroups/util/Buffer.java b/src/org/jgroups/util/Buffer.java index 23020f6827..2a55a9a772 100644 --- a/src/org/jgroups/util/Buffer.java +++ b/src/org/jgroups/util/Buffer.java @@ -3,6 +3,7 @@ import org.jgroups.Message; import org.jgroups.annotations.GuardedBy; +import java.io.Closeable; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -22,7 +23,7 @@ * @author Bela Ban * @since 5.4 */ -public abstract class Buffer implements Iterable { +public abstract class Buffer implements Iterable, Closeable { protected final Lock lock=new ReentrantLock(); protected final AtomicInteger adders=new AtomicInteger(0); protected long offset; @@ -54,8 +55,11 @@ public abstract class Buffer implements Iterable { /** Returns the current capacity in the buffer. This value is fixed in a fixed-size buffer * (e.g. {@link FixedBuffer}), but can change in a dynamic buffer ({@link DynamicBuffer}) */ - public abstract int capacity(); - public void resetStats() {} + public abstract int capacity(); + public void resetStats() {} + public void open(boolean b) {} + @Override + public void close() {open(false);} /** * Adds an element if the element at the given index is null. Returns true if no element existed at the given index, @@ -66,23 +70,25 @@ public void resetStats() {} */ // used: single message received public boolean add(long seqno, T element) { - return add(seqno, element, null, Options.DEFAULT()); + return add(seqno, element, null, Options.DEFAULT(), false); } /** * Adds an element if the element at the given index is null. Returns true if no element existed at the given index, * else returns false and doesn't set the element. * - * @param seqno - * @param element + * @param seqno The seqno of the element + * @param element The element to be added * @param remove_filter A filter used to remove all consecutive messages passing the filter (and non-null). This * doesn't necessarily null a removed message, but may simply advance an index * (e.g. highest delivered). Ignored if null. - * @param options + * @param options The options passed to the call + * @param dont_block If true, don't block when no space is available, but instead drop the element. This + * parameter is set by calling Message.isFlagSet(DONT_BLOCK) * @return True if the element at the computed index was null, else false */ // used: send message - public abstract boolean add(long seqno, T element, Predicate remove_filter, Options options); + public abstract boolean add(long seqno, T element, Predicate remove_filter, Options options, boolean dont_block); // used: MessageBatch received public abstract boolean add(MessageBatch batch, Function seqno_getter, boolean remove_from_batch, T const_value); diff --git a/src/org/jgroups/util/DynamicBuffer.java b/src/org/jgroups/util/DynamicBuffer.java index caabb42c29..ebd9e850c2 100644 --- a/src/org/jgroups/util/DynamicBuffer.java +++ b/src/org/jgroups/util/DynamicBuffer.java @@ -84,7 +84,6 @@ public DynamicBuffer(int num_rows, int elements_per_row, long offset, double res throw new IllegalArgumentException("resize_factor needs to be > 1"); } - public int getElementsPerRow() {return elements_per_row;} /** Returns the total capacity in the matrix */ @@ -118,7 +117,7 @@ public void resetStats() { * @return True if the element at the computed index was null, else false */ @Override - public boolean add(long seqno, T element, Predicate remove_filter, Options __) { + public boolean add(long seqno, T element, Predicate remove_filter, Options __, boolean ignored) { lock.lock(); try { if(seqno - hd <= 0) @@ -185,7 +184,7 @@ public boolean add(MessageBatch batch, Function seqno_getter, boolean re if(seqno < 0) continue; T element=const_value != null? const_value : msg; - boolean added=add(seqno, element, null, Options.DEFAULT()); + boolean added=add(seqno, element, null, Options.DEFAULT(), false); retval=retval || added; if(!added || remove_from_batch) it.remove(); @@ -212,7 +211,7 @@ public boolean add(final List> list, boolean remove_added_elements, LongTuple tuple=it.next(); long seqno=tuple.getVal1(); T element=const_value != null? const_value : tuple.getVal2(); - if(add(seqno, element, null, null)) + if(add(seqno, element, null, null, false)) added=true; else if(remove_added_elements) it.remove(); diff --git a/src/org/jgroups/util/FixedBuffer.java b/src/org/jgroups/util/FixedBuffer.java index 05375c377b..4ffd3dcdff 100644 --- a/src/org/jgroups/util/FixedBuffer.java +++ b/src/org/jgroups/util/FixedBuffer.java @@ -2,7 +2,6 @@ import org.jgroups.annotations.GuardedBy; -import java.io.Closeable; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; @@ -23,12 +22,12 @@ * @author Bela Ban * @since 5.4 */ -public class FixedBuffer extends Buffer implements Closeable { +public class FixedBuffer extends Buffer { /** Holds the elements */ protected T[] buf; protected final Condition buffer_full=lock.newCondition(); - /** Used to unblock blocked senders on close() */ + /** Used to unblock blocked senders on close(). When false, senders don't block when full but discard element */ protected boolean open=true; protected final LongAdder num_blockings=new LongAdder(); @@ -63,16 +62,8 @@ public FixedBuffer(int capacity, long offset) { public AverageMinMax avgTimeBlocked() {return avg_time_blocked;} public long numDroppedMessages() {return num_dropped_msgs.sum();} - /** - * Adds a new element to the buffer - * @param seqno The seqno of the element - * @param element The element - * @param remove_filter - * @param opts The options carried with this methods, e.g. whether to block when not enough space is available. - * @return True if the element was added, false otherwise. - */ @Override - public boolean add(long seqno, T element, Predicate remove_filter, Options opts) { + public boolean add(long seqno, T element, Predicate remove_filter, Options opts, boolean dont_block) { boolean block=opts != null && opts.block(); lock.lock(); try { @@ -80,7 +71,7 @@ public boolean add(long seqno, T element, Predicate remove_filter, Options op if(dist <= 0) return false; - if(dist > capacity() && (!block || !block(seqno))) { // no space for message + if(dist > capacity() && (!block || dont_block || !block(seqno))) { // no space for message num_dropped_msgs.increment(); return false; } @@ -91,7 +82,7 @@ public boolean add(long seqno, T element, Predicate remove_filter, Options op buf[index]=element; size++; - // see if high needs to moved forward + // see if high needs to be moved forward if(seqno - high > 0) high=seqno; @@ -128,7 +119,7 @@ public boolean add(MessageBatch batch, Function seqno_getter, boolean re if(seqno < 0) continue; T element=const_value != null? const_value : msg; - boolean added=add(seqno, element, null, Options.DEFAULT()); + boolean added=add(seqno, element, null, Options.DEFAULT(), false); retval=retval || added; if(!added || remove_from_batch) it.remove(); @@ -150,7 +141,7 @@ public boolean add(final List> list, boolean remove_added_elements, LongTuple tuple=it.next(); long seqno=tuple.getVal1(); T element=const_value != null? const_value : tuple.getVal2(); - if(add(seqno, element, null, null)) + if(add(seqno, element, null, null, false)) added=true; else if(remove_added_elements) it.remove(); @@ -329,11 +320,10 @@ public void resetStats() { avg_time_blocked.clear(); } - @Override - public void close() { + public void open(boolean b) { lock.lock(); try { - open=false; + open=b; buffer_full.signalAll(); } finally { diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index 1ae4c7d8b9..f89803d064 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -557,12 +557,8 @@ public static void closeFast(JChannel ... channels) { * the view until failure detection has kicked in and the new coord installed the new view */ public static void shutdown(JChannel ch) throws Exception { - DISCARD discard=new DISCARD(); - discard.setAddress(ch.getAddress()); - discard.discardAll(true); - ProtocolStack stack=ch.getProtocolStack(); - TP transport=stack.getTransport(); - stack.insertProtocol(discard,ProtocolStack.Position.ABOVE,transport.getClass()); + DISCARD discard=new DISCARD().setAddress(ch.getAddress()).discardAll(true); + ch.stack().insertProtocol(discard,ProtocolStack.Position.ABOVE,TP.class); //abruptly shutdown FD_SOCK just as in real life when member gets killed non gracefully FD_SOCK fd=ch.getProtocolStack().findProtocol(FD_SOCK.class); @@ -578,7 +574,7 @@ public static void shutdown(JChannel ch) throws Exception { View new_view=new View(new_vid,members); // inject view in which the shut-down member is the only element - GMS gms=stack.findProtocol(GMS.class); + GMS gms=ch.stack().findProtocol(GMS.class); gms.installView(new_view); } Util.close(ch); diff --git a/tests/junit-functional/org/jgroups/tests/BufferTest.java b/tests/junit-functional/org/jgroups/tests/BufferTest.java index 92353cd418..39baa94183 100644 --- a/tests/junit-functional/org/jgroups/tests/BufferTest.java +++ b/tests/junit-functional/org/jgroups/tests/BufferTest.java @@ -466,14 +466,14 @@ public void testAddAndRemove(Buffer buf) { buf.add(3, msg(3)); assert buf.high() == 3; - buf.add(4, msg(4, true), dont_loopback_filter, Options.DEFAULT()); + buf.add(4, msg(4, true), dont_loopback_filter, Options.DEFAULT(), false); assert buf.highestDelivered() == 2; assert buf.getHighestDeliverable() == 4; buf.removeMany(false, 10); assert buf.highestDelivered() == 4; - buf.add(5, msg(5, true), dont_loopback_filter, Options.DEFAULT()); - buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT()); + buf.add(5, msg(5, true), dont_loopback_filter, Options.DEFAULT(), false); + buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT(), false); assert buf.highestDelivered() == 6; assert IntStream.rangeClosed(1,2).allMatch(n -> buf._get(n) == null); assert IntStream.rangeClosed(3,6).allMatch(n -> buf._get(n) != null); @@ -481,7 +481,7 @@ public void testAddAndRemove(Buffer buf) { public void testAddAndRemove2(Buffer buf) { for(int i=1; i <=10; i++) - buf.add(i, msg(i, true), dont_loopback_filter, Options.DEFAULT()); + buf.add(i, msg(i, true), dont_loopback_filter, Options.DEFAULT(), false); assert buf.highestDelivered() == 10; assert buf.high() == 10; assert buf.getHighestDeliverable() == 10; @@ -496,9 +496,9 @@ public void testAddAndRemove2(Buffer buf) { public void testAddAndRemove3(Buffer type) { Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>(3, 10, 3) : new FixedBuffer<>(3); - buf.add(5, msg(5, true), dont_loopback_filter, Options.DEFAULT()); - buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT()); - buf.add(4, msg(4, true), dont_loopback_filter, Options.DEFAULT()); + buf.add(5, msg(5, true), dont_loopback_filter, Options.DEFAULT(), false); + buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT(), false); + buf.add(4, msg(4, true), dont_loopback_filter, Options.DEFAULT(), false); assert buf.high() == 6; assert buf.getHighestDeliverable() == 6; assert buf.highestDelivered() == 6; @@ -508,9 +508,9 @@ public void testAddAndRemove3(Buffer type) { public void testAddAndRemove4(Buffer type) { Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>(3, 10, 3) : new FixedBuffer<>(3); - buf.add(7, msg(7, true), dont_loopback_filter, Options.DEFAULT()); - buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT()); - buf.add(4, msg(4, true), dont_loopback_filter, Options.DEFAULT()); + buf.add(7, msg(7, true), dont_loopback_filter, Options.DEFAULT(), false); + buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT(), false); + buf.add(4, msg(4, true), dont_loopback_filter, Options.DEFAULT(), false); assert buf.high() == 7; assert buf.getHighestDeliverable() == 4; assert buf.highestDelivered() == 4; @@ -1787,18 +1787,41 @@ public void testBlockingAddAndClose(Buffer type) { return; FixedBuffer buf=new FixedBuffer<>(10, 0); for(int i=0; i <= 10; i++) - buf.add(i, i, null, OPTS); + buf.add(i, i, null, OPTS, false); System.out.println("buf = " + buf); new Thread(() -> { Util.sleep(1000); buf.close(); }).start(); int seqno=buf.capacity() +1; - boolean success=buf.add(seqno, seqno, null, OPTS); + boolean success=buf.add(seqno, seqno, null, OPTS, false); System.out.println("buf=" + buf); assert !success; assert buf.size() == 10; assert buf.numMissing() == 0; + + List list=buf.removeMany(true, 5); + assert list.size() == 5; + List expected=IntStream.rangeClosed(1, 5).boxed().collect(Collectors.toList()); + assert expected.equals(list); + + boolean added=buf.add(11, 11); + assert added && buf.size() == 6; + + for(int i=12; i <= 20; i++) // 16-20 will be dropped as open==false + buf.add(i,i); + assert buf.size() == 10; + expected=IntStream.rangeClosed(6,15).boxed().collect(Collectors.toList()); + List actual=buf.removeMany(true, 20); + assert actual.equals(expected); + assert buf.isEmpty(); + buf.open(true); + for(int i=16; i <= 25; i++) + buf.add(i,i); + assert buf.size() == 10; + expected=IntStream.rangeClosed(16,25).boxed().collect(Collectors.toList()); + actual=buf.removeMany(true, 20); + assert actual.equals(expected); } public void testBlockingAddAndPurge(Buffer type) throws InterruptedException { @@ -1806,7 +1829,7 @@ public void testBlockingAddAndPurge(Buffer type) throws InterruptedExce return; final FixedBuffer buf=new FixedBuffer<>(10, 0); for(int i=0; i <= 10; i++) - buf.add(i, i, null, OPTS); + buf.add(i, i, null, OPTS, false); System.out.println("buf = " + buf); Thread thread=new Thread(() -> { Util.sleep(1000); @@ -1815,7 +1838,7 @@ public void testBlockingAddAndPurge(Buffer type) throws InterruptedExce buf.purge(3); }); thread.start(); - boolean success=buf.add(11, 11, null, OPTS); + boolean success=buf.add(11, 11, null, OPTS, false); System.out.println("buf=" + buf); assert success; thread.join(10000); @@ -1827,7 +1850,7 @@ public void testBlockingAddAndPurge2(Buffer type) throws TimeoutExcepti if(type instanceof DynamicBuffer) return; final FixedBuffer buf=new FixedBuffer<>(10, 0); - IntStream.rangeClosed(1, buf.capacity()).boxed().forEach(n -> buf.add(n, n, null, OPTS)); + IntStream.rangeClosed(1, buf.capacity()).boxed().forEach(n -> buf.add(n, n, null, OPTS, false)); System.out.println("buf = " + buf); BlockingAdder adder=new BlockingAdder(buf, 11, 14); adder.start(); @@ -2021,7 +2044,7 @@ public boolean success() { public void run() { try { latch.await(); - success=buf.add(seqno, seqno, null, new Options().block(true)); + success=buf.add(seqno, seqno, null, new Options().block(true), false); } catch(InterruptedException e) { e.printStackTrace(); @@ -2046,7 +2069,7 @@ protected BlockingAdder(FixedBuffer buf, int from, int i) { @Override public void run() { for(int i=from; i <= to; i++) { - boolean rc=buf.add(i, i, null, opts); + boolean rc=buf.add(i, i, null, opts, false); if(rc) { added++; System.out.printf("-- added %d\n", i); diff --git a/tests/junit-functional/org/jgroups/tests/ReliableUnicastBlockTest.java b/tests/junit-functional/org/jgroups/tests/ReliableUnicastBlockTest.java new file mode 100644 index 0000000000..eb351edb9c --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/ReliableUnicastBlockTest.java @@ -0,0 +1,195 @@ +package org.jgroups.tests; + +import org.jgroups.*; +import org.jgroups.protocols.DISCARD; +import org.jgroups.protocols.ReliableUnicast; +import org.jgroups.protocols.TP; +import org.jgroups.protocols.UNICAST4; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.stack.ProtocolStack; +import org.jgroups.util.MyReceiver; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Tests {@link org.jgroups.protocols.UNICAST4} and other subclasses of {@link org.jgroups.protocols.ReliableUnicast} + * for (sender-)blocking operations + * @author Bela Ban + * @since 5.4 + */ +@Test(groups=Global.FUNCTIONAL,singleThreaded=true) +public class ReliableUnicastBlockTest { + protected JChannel a,b,c; + protected MyReceiver rb, rc; + protected static long CONN_CLOSE_TIMEOUT=2000; + protected static final List EXPECTED=IntStream.rangeClosed(1,10).boxed().collect(Collectors.toList()); + + @BeforeMethod + protected void setup() throws Exception { + a=new JChannel(Util.getTestStackNew()).name("A"); + ((UNICAST4)a.stack().findProtocol(UNICAST4.class)).capacity(5); + ReliableUnicast u=a.stack().findProtocol(ReliableUnicast.class); + u.setConnCloseTimeout(CONN_CLOSE_TIMEOUT); + b=new JChannel(Util.getTestStackNew()).name("B").receiver(rb=new MyReceiver().name("B")); + c=new JChannel(Util.getTestStackNew()).name("C").receiver(rc=new MyReceiver().name("C")); + Stream.of(a,b,c).map(ch -> ch.stack().getTransport().getDiagnosticsHandler()).forEach(d -> d.setEnabled(true)); + a.connect("ReliableUnicastBlockTest"); + b.connect("ReliableUnicastBlockTest"); + c.connect("ReliableUnicastBlockTest"); + Util.waitUntilAllChannelsHaveSameView(2000, 100, a,b,c); + } + + @AfterMethod + protected void destroy() { + Util.close(c,b,a); + } + + /** Tests A sending to B and C and blocking on waiting for ACKs from B, then B leaves -> this should unblock A */ + public void testSenderBlockingAndViewChange() throws Exception { + final Address target_b=b.address(), target_c=c.address(); + Util.shutdown(b); + Sender sender=new Sender(a, target_b, target_c); + sender.start(); // will block after sending 5 unicasts to B + // Wait until sender blocks: seqno >= 5 + Util.waitUntilTrue(2000, 100, () -> sender.seqno() >= 5); + + // inject view change excluding B + View view=View.create(a.address(), 10L, a.address(), c.address()); + System.out.printf("-- installing view %s\n", view); + GMS gms=a.stack().findProtocol(GMS.class); + // This closes the conn to B (state: CLOSING). When the entry to B is removed (state: CLOSED), the sender to B + // will be unblocked and the messages to C can be sent + gms.installView(view); + + // B dropped all messages: + Util.waitUntilTrue(1000, 100, () -> rb.size() > 0); + assert rb.size() == 0; + + // Waits until conn_close_timeout (2s in this test) kicks in and removes the conn to B, unblocking the sender + Util.waitUntil(2000, 100, () -> !sender.isAlive()); + + // C received all 10 messages: + Util.waitUntil(2000, 100, () -> rc.size() == 10, () -> print(rb,rc)); + System.out.printf("-- received msgs:\n%s\n", print(rb,rc)); + assert rc.list().equals(EXPECTED); + } + + /** A blocks sending message to B, then A is closed */ + public void testSenderBlockingAndChannelCloseA() throws Exception { + Util.close(c); + final Address target_b=b.address(); + Util.shutdown(b); + Sender sender=new Sender(a, target_b); + sender.start(); // will block + // Wait until sender blocks: seqno >= 5 + Util.waitUntilTrue(2000, 100, () -> sender.seqno() >= 5); + Util.close(a); + Util.waitUntil(2000, 100, () -> !sender.isAlive()); + } + + /** A blocks sending messages to B, then B is closed */ + public void testSenderBlockingAndChannelCloseB() throws Exception { + Util.close(c); + final Address target_b=b.address(); + DISCARD discard=new DISCARD().discardAll(true); + b.stack().insertProtocol(discard, ProtocolStack.Position.ABOVE, TP.class); + Sender sender=new Sender(a, target_b); + sender.start(); // will block + // Wait until sender blocks: seqno >= 5 + Util.waitUntilTrue(2000, 100, () -> sender.seqno() >= 5); + + // inject view change excluding B + View view=View.create(a.address(), 10L, a.address()); + System.out.printf("-- installing view %s\n", view); + GMS gms=a.stack().findProtocol(GMS.class); + gms.installView(view); // this should unblock the sender thread above + + // B dropped all messages: + Util.waitUntilTrue(1000, 100, () -> rb.size() > 0); + assert rb.size() == 0; + Util.waitUntil(2000, 100, () -> !sender.isAlive()); + } + + /** A blocks sending to B. Then A's connection to B is closed (state: CLOSING), the reopened (state: OPEN). + * A should now again be able to send messages to B (as soon as B's DISCARD has been removed). This mimicks + * a network partition which subsequently heals */ + public void testConnectionCloseThenReopen() throws Exception { + Util.close(c); + ReliableUnicast u=a.stack().findProtocol(ReliableUnicast.class); + u.setConnCloseTimeout(60_000); // to give the MergeView a chance to re-open the connection to B + final Address target_b=b.address(); + DISCARD discard=new DISCARD().discardAll(true); + b.stack().insertProtocol(discard, ProtocolStack.Position.ABOVE, TP.class); + Sender sender=new Sender(a, target_b); + sender.start(); // will block + // Wait until sender blocks: seqno >= 5 + Util.waitUntilTrue(2000, 100, () -> sender.seqno() >= 5); + + // inject view change excluding B + View view=View.create(a.address(), 10L, a.address()); + System.out.printf("-- installing view %s\n", view); + GMS gms=a.stack().findProtocol(GMS.class); + gms.installView(view); + + Util.waitUntilTrue(2000, 100, () -> !sender.isAlive()); + assert sender.isAlive(); + + View view_b=View.create(b.address(), 10L, b.address()); + ViewId vid=new ViewId(a.address(), 12L); + MergeView mv=new MergeView(vid, List.of(a.address(), b.address()), List.of(view, view_b)); + System.out.printf("-- Installing view %s\n", mv); + gms.installView(mv); + discard.discardAll(false); + GMS gms_b=b.stack().findProtocol(GMS.class); + gms_b.installView(mv); + + Util.waitUntil(2000, 100, () -> rb.size() == 10); + assert rb.list().equals(EXPECTED); + System.out.printf("-- rb: %s\n", print(rb)); + } + + protected static class Sender extends Thread { + protected final JChannel ch; + protected final Address[] targets; + protected int seqno; + + protected Sender(JChannel ch, Address ... targets) { + this.ch=ch; + this.targets=targets; + setName("sender-thread"); + } + + public int seqno() { + return seqno; + } + + public void run() { + System.out.printf("A sending %d messages to %s\n", 10, Arrays.toString(targets)); + for(seqno=1; seqno <= 10; seqno++) { + try { + for(Address target: targets) { + ch.send(target, seqno); + System.out.printf("-- sent msg #%d to %s\n", seqno, target); + } + } + catch(Exception ex) { + System.out.printf("-- received exception as expected: %s\n", ex); + break; + } + } + } + } + + @SafeVarargs + protected static String print(MyReceiver ... receivers) { + return Stream.of(receivers).map(r -> String.format("%s: %s", r.name(), r.list())).collect(Collectors.joining("\n")); + } +} diff --git a/tests/stress/org/jgroups/tests/RingBufferStressTest.java b/tests/stress/org/jgroups/tests/RingBufferStressTest.java index 39a209fc37..6c2728b733 100644 --- a/tests/stress/org/jgroups/tests/RingBufferStressTest.java +++ b/tests/stress/org/jgroups/tests/RingBufferStressTest.java @@ -93,7 +93,7 @@ public void run() { num.decrementAndGet(); break; } - buf.add(seqno, MSG, null, OPTS); + buf.add(seqno, MSG, null, OPTS, false); } } }