Skip to content

Commit

Permalink
- max_batch_size > 0 doesn't remove all messages (https://issues.redh…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Apr 16, 2024
1 parent e466ee6 commit ab03668
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 20 deletions.
9 changes: 9 additions & 0 deletions src/org/jgroups/JChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ else if(ip_version == StackType.Dual)
public JChannel stats(boolean stats) {this.stats=stats; return this;}
public boolean getDiscardOwnMessages() {return discard_own_messages;}
public JChannel setDiscardOwnMessages(boolean flag) {discard_own_messages=flag; return this;}

@Deprecated(since="5.3.5",forRemoval=true)
public boolean flushSupported() {return flush_supported;}


Expand Down Expand Up @@ -316,6 +318,7 @@ public synchronized JChannel connect(String cluster_name) throws Exception {
}

/** Connects the channel to a cluster. */
@Deprecated(since="5.3.5",forRemoval=true)
@ManagedOperation(description="Connects the channel to a group")
protected synchronized JChannel connect(String cluster_name, boolean useFlushIfPresent) throws Exception {
if(!_preConnect(cluster_name))
Expand Down Expand Up @@ -360,6 +363,7 @@ public synchronized JChannel connect(String cluster_name, Address target, long t
* @exception StateTransferException State transfer was not successful
*
*/
@Deprecated(since="5.3.5",forRemoval=true)
public synchronized JChannel connect(String cluster_name, Address target, long timeout,
boolean useFlushIfPresent) throws Exception {
if(!_preConnect(cluster_name))
Expand Down Expand Up @@ -506,6 +510,7 @@ public JChannel getState(Address target, long timeout) throws Exception {


/** Retrieves state from the target member. See {@link #getState(Address,long)} for details */
@Deprecated(since="5.3.5",forRemoval=true)
public JChannel getState(Address target, long timeout, boolean useFlushIfPresent) throws Exception {
Callable<Boolean> flusher =() -> Util.startFlush(JChannel.this);
return getState(target, timeout, useFlushIfPresent?flusher:null);
Expand All @@ -521,6 +526,7 @@ public JChannel getState(Address target, long timeout, boolean useFlushIfPresent
* JGroups provides a helper random sleep time backoff algorithm for flush using Util class.
* @param automatic_resume if true call {@link #stopFlush()} after the flush
*/
@Deprecated(since="5.3.5",forRemoval=true)
public JChannel startFlush(boolean automatic_resume) throws Exception {
if(!flushSupported())
throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
Expand All @@ -545,6 +551,7 @@ public JChannel startFlush(boolean automatic_resume) throws Exception {
* {@link #stopFlush(List)} method with the same list of members used in {@link #startFlush(List, boolean)}.
* @param automatic_resume if true call {@link #stopFlush()} after the flush
*/
@Deprecated(since="5.3.5",forRemoval=true)
public JChannel startFlush(List<Address> flushParticipants, boolean automatic_resume) throws Exception {
if (!flushSupported())
throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
Expand All @@ -567,6 +574,7 @@ public JChannel startFlush(List<Address> flushParticipants, boolean automatic_re
}

/** Stops the current flush round. Cluster members are unblocked and allowed to send new and pending messages */
@Deprecated(since="5.3.5",forRemoval=true)
public JChannel stopFlush() {
if(!flushSupported())
throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
Expand All @@ -581,6 +589,7 @@ public JChannel stopFlush() {
* same list of members prior to invocation of this method.
* @param flushParticipants the flush participants
*/
@Deprecated(since="5.3.5",forRemoval=true)
public JChannel stopFlush(List<Address> flushParticipants) {
if(!flushSupported())
throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration");
Expand Down
12 changes: 6 additions & 6 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {

@Property(description="Max number of messages to ask for in a retransmit request. 0 disables this and uses " +
"the max bundle size in the transport")
protected int max_xmit_req_size=1024;
protected int max_xmit_req_size;

@Property(description="The max size of a message batch when delivering messages. 0 is unbounded")
protected int max_batch_size;
Expand Down Expand Up @@ -949,14 +949,14 @@ protected void removeAndDeliver(Table<Message> win, Address sender) {
if(adders.getAndIncrement() != 0)
return;

final MessageBatch batch=new MessageBatch(win.getNumDeliverable())
.dest(local_addr).sender(sender).multicast(false);
MessageBatch batch=new MessageBatch(win.getNumDeliverable()).dest(local_addr).sender(sender).multicast(false);
Supplier<MessageBatch> batch_creator=() -> batch;
MessageBatch mb=null;
do {
try {
batch.reset(); // sets index to 0: important as batch delivery may not remove messages from batch!
win.removeMany(true, max_batch_size, drop_oob_and_dont_loopback_msgs_filter,
batch_creator, BATCH_ACCUMULATOR);
mb=win.removeMany(true, max_batch_size, drop_oob_and_dont_loopback_msgs_filter,
batch_creator, BATCH_ACCUMULATOR);
}
catch(Throwable t) {
log.error("%s: failed removing messages from table for %s: %s", local_addr, sender, t);
Expand All @@ -966,7 +966,7 @@ protected void removeAndDeliver(Table<Message> win, Address sender) {
deliverBatch(batch); // catches Throwable
}
}
while(adders.decrementAndGet() != 0);
while(mb != null || adders.decrementAndGet() != 0);
}


Expand Down
32 changes: 23 additions & 9 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
@ManagedAttribute(description="Number of retransmit responses sent",type=AttributeType.SCALAR)
protected final LongAdder xmit_rsps_sent=new LongAdder();

/** The average number of messages in a received {@link MessageBatch} */
@ManagedAttribute(description="The average number of messages in a batch removed from the table and delivered to the application")
protected final AverageMinMax avg_batch_size=new AverageMinMax();

@ManagedAttribute(description="Is the retransmit task running")
public boolean isXmitTaskRunning() {return xmit_task != null && !xmit_task.isDone();}

Expand Down Expand Up @@ -456,6 +460,7 @@ public void resetStats() {
xmit_rsps_sent.reset();
stability_msgs.clear();
digest_history.clear();
avg_batch_size.clear();
Table<Message> table=local_addr != null? xmit_table.get(local_addr) : null;
if(table != null)
table.resetStats();
Expand Down Expand Up @@ -936,20 +941,25 @@ protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loop
boolean remove_msgs=discard_delivered_msgs && !loopback;
MessageBatch batch=new MessageBatch(buf.size()).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
Supplier<MessageBatch> batch_creator=() -> batch;
MessageBatch mb=null;
do {
try {
batch.reset();
// Don't include DUMMY and OOB_DELIVERED messages in the removed set
buf.removeMany(remove_msgs, max_batch_size, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
batch_creator, BATCH_ACCUMULATOR);
mb=buf.removeMany(remove_msgs, max_batch_size, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
batch_creator, BATCH_ACCUMULATOR);
}
catch(Throwable t) {
log.error("failed removing messages from table for " + sender, t);
}
if(!batch.isEmpty())
int size=batch.size();
if(size > 0) {
if(stats)
avg_batch_size.add(size);
deliverBatch(batch);
}
}
while(adders.decrementAndGet() != 0);
while(mb != null || adders.decrementAndGet() != 0);
if(rebroadcasting)
checkForRebroadcasts();
}
Expand All @@ -964,7 +974,8 @@ protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loop
* @param original_sender The member who originally sent the messsage. Guaranteed to be non-null
*/
protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) {
log.trace("%s <-- %s: XMIT(%s%s)", local_addr, xmit_requester, original_sender, missing_msgs);
if(is_trace)
log.trace("%s <-- %s: XMIT(%s%s)", local_addr, xmit_requester, original_sender, missing_msgs);

if(stats)
xmit_reqs_received.add(missing_msgs.size());
Expand All @@ -975,15 +986,15 @@ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Add
return;
}

if(is_trace)
log.trace("%s --> [all]: resending to %s %s", local_addr, original_sender, missing_msgs);
for(long i: missing_msgs) {
Message msg=buf.get(i);
if(msg == null) {
if(log.isWarnEnabled() && log_not_found_msgs && !local_addr.equals(xmit_requester) && i > buf.getLow())
log.warn(Util.getMessage("MessageNotFound"), local_addr, original_sender, i);
continue;
}
if(is_trace)
log.trace("%s --> [all]: resending %s#%d", local_addr, original_sender, i);
sendXmitRsp(xmit_requester, msg);
}
}
Expand Down Expand Up @@ -1070,6 +1081,9 @@ protected void sendXmitRsp(Address dest, Message msg) {
msg.setSrc(local_addr);

if(use_mcast_xmit) { // we simply send the original multicast message
// we modify the original message (instead of copying it) by setting flag DONT_BLOCK: this is fine because
// the original sender will send the message without this flag; only retransmissions will carry the flag
msg.setFlag(DONT_BLOCK);
resend(msg);
return;
}
Expand Down Expand Up @@ -1477,7 +1491,8 @@ protected void retransmit(SeqnoList missing_msgs, final Address sender, boolean
Message retransmit_msg=new ObjectMessage(dest, missing_msgs).setFlag(OOB, NO_FC).setFlag(DONT_BLOCK)
.putHeader(this.id, NakAckHeader2.createXmitRequestHeader(sender));

log.trace("%s --> %s: XMIT_REQ(%s)", local_addr, dest, missing_msgs);
if(is_trace)
log.trace("%s --> %s: XMIT_REQ(%s)", local_addr, dest, missing_msgs);
down_prot.down(retransmit_msg);
if(stats)
xmit_reqs_sent.add(missing_msgs.size());
Expand Down Expand Up @@ -1513,7 +1528,6 @@ protected void stopRetransmitTask() {
}



/**
* Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and
* sends retransmit request to all members from which we have missing messages
Expand Down
19 changes: 14 additions & 5 deletions tests/junit-functional/org/jgroups/tests/TableTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.jgroups.tests;

import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.*;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.NakAckHeader2;
Expand Down Expand Up @@ -854,10 +851,22 @@ public void testRemoveManyIntoMessageBatch() {
assert result == null;
}

public void testRemoveManyWithMaxBatchSize() {
Table<Message> table=new Table<>();
IntStream.rangeClosed(1, 1024).forEach(n -> table.add(n,new ObjectMessage(null, "hello-" + n)));
assert table.size() == 1024;
assert table.getNumMissing() == 0;

MessageBatch batch=new MessageBatch(128);
Supplier<MessageBatch> batch_creator=() -> batch;
BiConsumer<MessageBatch,Message> accumulator=MessageBatch::add;
table.removeMany(true, 512, null, batch_creator, accumulator);
assert table.size() == 512;
}

public void testForEach() {
class MyVisitor<T> implements Table.Visitor<T> {
List<int[]> list=new ArrayList<>(20);
final List<int[]> list=new ArrayList<>(20);
public boolean visit(long seqno, T element, int row, int column) {
System.out.println("#" + seqno + ": " + element + ", row=" + row + ", column=" + column);
list.add(new int[]{row,column});
Expand Down

0 comments on commit ab03668

Please sign in to comment.