Skip to content

Commit

Permalink
Unblock sender on view change and stop() (https://issues.redhat.com/b…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Nov 29, 2024
1 parent d0c60e6 commit 4239dc7
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 90 deletions.
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/NAKACK4.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ protected void sendAck(Address to, Buffer<Message> win) {
}

@Override
protected void addToSendWindow(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter) {
win.add(seq, msg, filter, sendOptions());
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter, boolean dont_block) {
return win.add(seq, msg, filter, sendOptions(), dont_block);
}
}
14 changes: 7 additions & 7 deletions src/org/jgroups/protocols/ReliableMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -718,23 +718,22 @@ protected void send(Message msg, Buffer<Message> win, boolean dont_loopback_set)
lock.lock();
}
try {
addToSendWindow(win, msg_id, msg, dont_loopback_set? remove_filter : null);
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
if(addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null, msg.isFlagSet(DONT_BLOCK)))
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
}
finally {
if(lock != null)
lock.unlock();
}
}

/**
* Adds the message to the send window. The loop tries to handle temporary OOMEs by retrying if add() failed.
*/
protected void addToSendWindow(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter) {
/** Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed */
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter, boolean dont_block) {
long sleep=10;
boolean rc=false;
do {
try {
win.add(seq, msg, filter, sendOptions());
rc=win.add(seq, msg, filter, sendOptions(), dont_block);
break;
}
catch(Throwable t) {
Expand All @@ -745,6 +744,7 @@ protected void addToSendWindow(Buffer<Message> win, long seq, Message msg, Predi
}
}
while(running);
return rc;
}

protected void resend(Message msg) { // needed for byteman ProtPerf script - don't remove!
Expand Down
64 changes: 41 additions & 23 deletions src/org/jgroups/protocols/ReliableUnicast.java
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,10 @@ public Object down(Message msg) {
}

SenderEntry entry=getSenderEntry(dst);
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr);
send(msg, entry, dont_loopback_set);
num_msgs_sent.increment();
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr),
dont_block=msg.isFlagSet(DONT_BLOCK);
if(send(msg, entry, dont_loopback_set, dont_block))
num_msgs_sent.increment();
return null; // the message was already sent down the stack in send()
}

Expand Down Expand Up @@ -745,17 +746,6 @@ public void removeSendConnection(Address mbr) {
}
}

public void removeSendConnection(Predicate<Address> pred) {
for(Iterator<Map.Entry<Address,SenderEntry>> it=send_table.entrySet().iterator(); it.hasNext();) {
Map.Entry<Address,SenderEntry> e=it.next();
Address addr=e.getKey();
if(pred.test(addr)) {
e.getValue().state(State.CLOSED);
it.remove();
}
}
}

public void removeReceiveConnection(Address mbr) {
sendPendingAcks();
ReceiverEntry entry=recv_table.remove(mbr);
Expand All @@ -768,6 +758,8 @@ public void removeReceiveConnection(Address mbr) {
*/
@ManagedOperation(description="Trashes all connections to other nodes. This is only used for testing")
public void removeAllConnections() {
for(SenderEntry se: send_table.values())
se.state(State.CLOSED);
send_table.clear();
recv_table.clear();
}
Expand Down Expand Up @@ -1014,8 +1006,6 @@ protected SenderEntry getSenderEntry(Address dst) {
if(cache != null && !members.contains(dst))
cache.add(dst);
}
if(entry.state() == State.CLOSING)
entry.state(State.OPEN);
return entry;
}

Expand Down Expand Up @@ -1092,7 +1082,7 @@ protected void handleXmitRequest(Address sender, SeqnoList missing) {
}
}

protected void send(Message msg, SenderEntry entry, boolean dont_loopback_set) {
protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set, boolean dont_block) {
Buffer<Message> buf=entry.buf;
long seqno=entry.seqno.getAndIncrement();
short send_conn_id=entry.connId();
Expand All @@ -1107,8 +1097,12 @@ protected void send(Message msg, SenderEntry entry, boolean dont_loopback_set) {
lock.lock();
}
try {
addToSendWindow(buf, seqno, msg, dont_loopback_set? remove_filter : null);
boolean added=addToSendBuffer(buf, seqno, msg, dont_loopback_set? remove_filter : null, dont_block);
if(!added) // e.g. message already present in send buffer, or no space and dont_block set
return false;
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
if(entry.state() == State.CLOSING)
entry.state(State.OPEN);
if(conn_expiry_timeout > 0)
entry.update();
if(dont_loopback_set)
Expand All @@ -1126,16 +1120,21 @@ protected void send(Message msg, SenderEntry entry, boolean dont_loopback_set) {
sb.append(')');
log.trace(sb);
}
return true;
}

/**
* Adds the message to the send window. The loop tries to handle temporary OOMEs by retrying if add() failed.
* Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed.
* @return True if added successfully. False if not, e.g. no space in buffer and DONT_BLOCK set, or message
* already present, or seqno lower than buffer.low
*/
protected void addToSendWindow(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter) {
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg,
Predicate<Message> filter, boolean dont_block) {
long sleep=10;
boolean rc=false;
do {
try {
win.add(seq, msg, filter, sendOptions());
rc=win.add(seq, msg, filter, sendOptions(), dont_block);
break;
}
catch(Throwable t) {
Expand All @@ -1146,6 +1145,7 @@ protected void addToSendWindow(Buffer<Message> win, long seq, Message msg, Predi
}
}
while(running);
return rc;
}


Expand Down Expand Up @@ -1504,11 +1504,29 @@ protected Entry(short conn_id, Buffer<Message> buf) {
public Buffer<Message> buf() {return buf;}
public short connId() {return conn_id;}
protected void update() {timestamp.set(getTimestamp());}
protected State state() {return state;}
protected Entry state(State s) {if(this.state != s) {this.state=s; update();} return this;}
protected long age() {return MILLISECONDS.convert(getTimestamp() - timestamp.longValue(), NANOSECONDS);}
protected boolean needToSendAck() {return send_ack.compareAndSet(true, false);}
protected Entry sendAck() {send_ack.compareAndSet(false, true); return this;}
protected State state() {return state;}

protected Entry state(State s) {
if(state != s) {
switch(state) {
case OPEN:
if(s == State.CLOSED)
buf.open(false); // unblocks blocked senders
break;
case CLOSING:
buf.open(s != State.CLOSED);
break;
case CLOSED:
break;
}
state=s;
update();
}
return this;
}

/** Returns true if a real ACK should be sent. This is based on num_acks_sent being > ack_threshold */
public boolean update(int num_acks, final IntBinaryOperator op) {
Expand Down
2 changes: 0 additions & 2 deletions src/org/jgroups/protocols/UNICAST4.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,4 @@ protected boolean needToSendAck(Entry e, int num_acks) {
return e.update(num_acks, add_acks);
}



}
22 changes: 14 additions & 8 deletions src/org/jgroups/util/Buffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.jgroups.Message;
import org.jgroups.annotations.GuardedBy;

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand All @@ -22,7 +23,7 @@
* @author Bela Ban
* @since 5.4
*/
public abstract class Buffer<T> implements Iterable<T> {
public abstract class Buffer<T> implements Iterable<T>, Closeable {
protected final Lock lock=new ReentrantLock();
protected final AtomicInteger adders=new AtomicInteger(0);
protected long offset;
Expand Down Expand Up @@ -54,8 +55,11 @@ public abstract class Buffer<T> implements Iterable<T> {

/** Returns the current capacity in the buffer. This value is fixed in a fixed-size buffer
* (e.g. {@link FixedBuffer}), but can change in a dynamic buffer ({@link DynamicBuffer}) */
public abstract int capacity();
public void resetStats() {}
public abstract int capacity();
public void resetStats() {}
public void open(boolean b) {}
@Override
public void close() {open(false);}

/**
* Adds an element if the element at the given index is null. Returns true if no element existed at the given index,
Expand All @@ -66,23 +70,25 @@ public void resetStats() {}
*/
// used: single message received
public boolean add(long seqno, T element) {
return add(seqno, element, null, Options.DEFAULT());
return add(seqno, element, null, Options.DEFAULT(), false);
}

/**
* Adds an element if the element at the given index is null. Returns true if no element existed at the given index,
* else returns false and doesn't set the element.
*
* @param seqno
* @param element
* @param seqno The seqno of the element
* @param element The element to be added
* @param remove_filter A filter used to remove all consecutive messages passing the filter (and non-null). This
* doesn't necessarily null a removed message, but may simply advance an index
* (e.g. highest delivered). Ignored if null.
* @param options
* @param options The options passed to the call
* @param dont_block If true, don't block when no space is available, but instead drop the element. This
* parameter is set by calling Message.isFlagSet(DONT_BLOCK)
* @return True if the element at the computed index was null, else false
*/
// used: send message
public abstract boolean add(long seqno, T element, Predicate<T> remove_filter, Options options);
public abstract boolean add(long seqno, T element, Predicate<T> remove_filter, Options options, boolean dont_block);

// used: MessageBatch received
public abstract boolean add(MessageBatch batch, Function<T,Long> seqno_getter, boolean remove_from_batch, T const_value);
Expand Down
7 changes: 3 additions & 4 deletions src/org/jgroups/util/DynamicBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public DynamicBuffer(int num_rows, int elements_per_row, long offset, double res
throw new IllegalArgumentException("resize_factor needs to be > 1");
}


public int getElementsPerRow() {return elements_per_row;}

/** Returns the total capacity in the matrix */
Expand Down Expand Up @@ -118,7 +117,7 @@ public void resetStats() {
* @return True if the element at the computed index was null, else false
*/
@Override
public boolean add(long seqno, T element, Predicate<T> remove_filter, Options __) {
public boolean add(long seqno, T element, Predicate<T> remove_filter, Options __, boolean ignored) {
lock.lock();
try {
if(seqno - hd <= 0)
Expand Down Expand Up @@ -185,7 +184,7 @@ public boolean add(MessageBatch batch, Function<T,Long> seqno_getter, boolean re
if(seqno < 0)
continue;
T element=const_value != null? const_value : msg;
boolean added=add(seqno, element, null, Options.DEFAULT());
boolean added=add(seqno, element, null, Options.DEFAULT(), false);
retval=retval || added;
if(!added || remove_from_batch)
it.remove();
Expand All @@ -212,7 +211,7 @@ public boolean add(final List<LongTuple<T>> list, boolean remove_added_elements,
LongTuple<T> tuple=it.next();
long seqno=tuple.getVal1();
T element=const_value != null? const_value : tuple.getVal2();
if(add(seqno, element, null, null))
if(add(seqno, element, null, null, false))
added=true;
else if(remove_added_elements)
it.remove();
Expand Down
28 changes: 9 additions & 19 deletions src/org/jgroups/util/FixedBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.jgroups.annotations.GuardedBy;

import java.io.Closeable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -23,12 +22,12 @@
* @author Bela Ban
* @since 5.4
*/
public class FixedBuffer<T> extends Buffer<T> implements Closeable {
public class FixedBuffer<T> extends Buffer<T> {
/** Holds the elements */
protected T[] buf;
protected final Condition buffer_full=lock.newCondition();

/** Used to unblock blocked senders on close() */
/** Used to unblock blocked senders on close(). When false, senders don't block when full but discard element */
protected boolean open=true;

protected final LongAdder num_blockings=new LongAdder();
Expand Down Expand Up @@ -63,24 +62,16 @@ public FixedBuffer(int capacity, long offset) {
public AverageMinMax avgTimeBlocked() {return avg_time_blocked;}
public long numDroppedMessages() {return num_dropped_msgs.sum();}

/**
* Adds a new element to the buffer
* @param seqno The seqno of the element
* @param element The element
* @param remove_filter
* @param opts The options carried with this methods, e.g. whether to block when not enough space is available.
* @return True if the element was added, false otherwise.
*/
@Override
public boolean add(long seqno, T element, Predicate<T> remove_filter, Options opts) {
public boolean add(long seqno, T element, Predicate<T> remove_filter, Options opts, boolean dont_block) {
boolean block=opts != null && opts.block();
lock.lock();
try {
long dist=seqno - low;
if(dist <= 0)
return false;

if(dist > capacity() && (!block || !block(seqno))) { // no space for message
if(dist > capacity() && (!block || dont_block || !block(seqno))) { // no space for message
num_dropped_msgs.increment();
return false;
}
Expand All @@ -91,7 +82,7 @@ public boolean add(long seqno, T element, Predicate<T> remove_filter, Options op
buf[index]=element;
size++;

// see if high needs to moved forward
// see if high needs to be moved forward
if(seqno - high > 0)
high=seqno;

Expand Down Expand Up @@ -128,7 +119,7 @@ public boolean add(MessageBatch batch, Function<T,Long> seqno_getter, boolean re
if(seqno < 0)
continue;
T element=const_value != null? const_value : msg;
boolean added=add(seqno, element, null, Options.DEFAULT());
boolean added=add(seqno, element, null, Options.DEFAULT(), false);
retval=retval || added;
if(!added || remove_from_batch)
it.remove();
Expand All @@ -150,7 +141,7 @@ public boolean add(final List<LongTuple<T>> list, boolean remove_added_elements,
LongTuple<T> tuple=it.next();
long seqno=tuple.getVal1();
T element=const_value != null? const_value : tuple.getVal2();
if(add(seqno, element, null, null))
if(add(seqno, element, null, null, false))
added=true;
else if(remove_added_elements)
it.remove();
Expand Down Expand Up @@ -329,11 +320,10 @@ public void resetStats() {
avg_time_blocked.clear();
}

@Override
public void close() {
public void open(boolean b) {
lock.lock();
try {
open=false;
open=b;
buffer_full.signalAll();
}
finally {
Expand Down
Loading

0 comments on commit 4239dc7

Please sign in to comment.