From ab036685991c2af5278219de50e0029fc6b4c706 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 16 Apr 2024 12:00:52 +0200 Subject: [PATCH] - max_batch_size > 0 doesn't remove all messages (https://issues.redhat.com/browse/JGRP-2773) - Deprecated flushing in JChannel (https://issues.redhat.com/browse/JGRP-2784) --- src/org/jgroups/JChannel.java | 9 ++++++ src/org/jgroups/protocols/UNICAST3.java | 12 +++---- src/org/jgroups/protocols/pbcast/NAKACK2.java | 32 +++++++++++++------ .../org/jgroups/tests/TableTest.java | 19 ++++++++--- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/src/org/jgroups/JChannel.java b/src/org/jgroups/JChannel.java index e528e4fbce4..b3e87719fb3 100644 --- a/src/org/jgroups/JChannel.java +++ b/src/org/jgroups/JChannel.java @@ -192,6 +192,8 @@ else if(ip_version == StackType.Dual) public JChannel stats(boolean stats) {this.stats=stats; return this;} public boolean getDiscardOwnMessages() {return discard_own_messages;} public JChannel setDiscardOwnMessages(boolean flag) {discard_own_messages=flag; return this;} + + @Deprecated(since="5.3.5",forRemoval=true) public boolean flushSupported() {return flush_supported;} @@ -316,6 +318,7 @@ public synchronized JChannel connect(String cluster_name) throws Exception { } /** Connects the channel to a cluster. */ + @Deprecated(since="5.3.5",forRemoval=true) @ManagedOperation(description="Connects the channel to a group") protected synchronized JChannel connect(String cluster_name, boolean useFlushIfPresent) throws Exception { if(!_preConnect(cluster_name)) @@ -360,6 +363,7 @@ public synchronized JChannel connect(String cluster_name, Address target, long t * @exception StateTransferException State transfer was not successful * */ + @Deprecated(since="5.3.5",forRemoval=true) public synchronized JChannel connect(String cluster_name, Address target, long timeout, boolean useFlushIfPresent) throws Exception { if(!_preConnect(cluster_name)) @@ -506,6 +510,7 @@ public JChannel getState(Address target, long timeout) throws Exception { /** Retrieves state from the target member. See {@link #getState(Address,long)} for details */ + @Deprecated(since="5.3.5",forRemoval=true) public JChannel getState(Address target, long timeout, boolean useFlushIfPresent) throws Exception { Callable flusher =() -> Util.startFlush(JChannel.this); return getState(target, timeout, useFlushIfPresent?flusher:null); @@ -521,6 +526,7 @@ public JChannel getState(Address target, long timeout, boolean useFlushIfPresent * JGroups provides a helper random sleep time backoff algorithm for flush using Util class. * @param automatic_resume if true call {@link #stopFlush()} after the flush */ + @Deprecated(since="5.3.5",forRemoval=true) public JChannel startFlush(boolean automatic_resume) throws Exception { if(!flushSupported()) throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration"); @@ -545,6 +551,7 @@ public JChannel startFlush(boolean automatic_resume) throws Exception { * {@link #stopFlush(List)} method with the same list of members used in {@link #startFlush(List, boolean)}. * @param automatic_resume if true call {@link #stopFlush()} after the flush */ + @Deprecated(since="5.3.5",forRemoval=true) public JChannel startFlush(List
flushParticipants, boolean automatic_resume) throws Exception { if (!flushSupported()) throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration"); @@ -567,6 +574,7 @@ public JChannel startFlush(List
flushParticipants, boolean automatic_re } /** Stops the current flush round. Cluster members are unblocked and allowed to send new and pending messages */ + @Deprecated(since="5.3.5",forRemoval=true) public JChannel stopFlush() { if(!flushSupported()) throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration"); @@ -581,6 +589,7 @@ public JChannel stopFlush() { * same list of members prior to invocation of this method. * @param flushParticipants the flush participants */ + @Deprecated(since="5.3.5",forRemoval=true) public JChannel stopFlush(List
flushParticipants) { if(!flushSupported()) throw new IllegalStateException("Flush is not supported, add pbcast.FLUSH protocol to your configuration"); diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java index 4c8b16d4e04..77b350fbba0 100644 --- a/src/org/jgroups/protocols/UNICAST3.java +++ b/src/org/jgroups/protocols/UNICAST3.java @@ -88,7 +88,7 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{ @Property(description="Max number of messages to ask for in a retransmit request. 0 disables this and uses " + "the max bundle size in the transport") - protected int max_xmit_req_size=1024; + protected int max_xmit_req_size; @Property(description="The max size of a message batch when delivering messages. 0 is unbounded") protected int max_batch_size; @@ -949,14 +949,14 @@ protected void removeAndDeliver(Table win, Address sender) { if(adders.getAndIncrement() != 0) return; - final MessageBatch batch=new MessageBatch(win.getNumDeliverable()) - .dest(local_addr).sender(sender).multicast(false); + MessageBatch batch=new MessageBatch(win.getNumDeliverable()).dest(local_addr).sender(sender).multicast(false); Supplier batch_creator=() -> batch; + MessageBatch mb=null; do { try { batch.reset(); // sets index to 0: important as batch delivery may not remove messages from batch! - win.removeMany(true, max_batch_size, drop_oob_and_dont_loopback_msgs_filter, - batch_creator, BATCH_ACCUMULATOR); + mb=win.removeMany(true, max_batch_size, drop_oob_and_dont_loopback_msgs_filter, + batch_creator, BATCH_ACCUMULATOR); } catch(Throwable t) { log.error("%s: failed removing messages from table for %s: %s", local_addr, sender, t); @@ -966,7 +966,7 @@ protected void removeAndDeliver(Table win, Address sender) { deliverBatch(batch); // catches Throwable } } - while(adders.decrementAndGet() != 0); + while(mb != null || adders.decrementAndGet() != 0); } diff --git a/src/org/jgroups/protocols/pbcast/NAKACK2.java b/src/org/jgroups/protocols/pbcast/NAKACK2.java index f13ce570804..44629d3fde6 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK2.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK2.java @@ -172,6 +172,10 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler @ManagedAttribute(description="Number of retransmit responses sent",type=AttributeType.SCALAR) protected final LongAdder xmit_rsps_sent=new LongAdder(); + /** The average number of messages in a received {@link MessageBatch} */ + @ManagedAttribute(description="The average number of messages in a batch removed from the table and delivered to the application") + protected final AverageMinMax avg_batch_size=new AverageMinMax(); + @ManagedAttribute(description="Is the retransmit task running") public boolean isXmitTaskRunning() {return xmit_task != null && !xmit_task.isDone();} @@ -456,6 +460,7 @@ public void resetStats() { xmit_rsps_sent.reset(); stability_msgs.clear(); digest_history.clear(); + avg_batch_size.clear(); Table table=local_addr != null? xmit_table.get(local_addr) : null; if(table != null) table.resetStats(); @@ -936,20 +941,25 @@ protected void removeAndDeliver(Table buf, Address sender, boolean loop boolean remove_msgs=discard_delivered_msgs && !loopback; MessageBatch batch=new MessageBatch(buf.size()).dest(null).sender(sender).clusterName(cluster_name).multicast(true); Supplier batch_creator=() -> batch; + MessageBatch mb=null; do { try { batch.reset(); // Don't include DUMMY and OOB_DELIVERED messages in the removed set - buf.removeMany(remove_msgs, max_batch_size, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs, - batch_creator, BATCH_ACCUMULATOR); + mb=buf.removeMany(remove_msgs, max_batch_size, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs, + batch_creator, BATCH_ACCUMULATOR); } catch(Throwable t) { log.error("failed removing messages from table for " + sender, t); } - if(!batch.isEmpty()) + int size=batch.size(); + if(size > 0) { + if(stats) + avg_batch_size.add(size); deliverBatch(batch); + } } - while(adders.decrementAndGet() != 0); + while(mb != null || adders.decrementAndGet() != 0); if(rebroadcasting) checkForRebroadcasts(); } @@ -964,7 +974,8 @@ protected void removeAndDeliver(Table buf, Address sender, boolean loop * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null */ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) { - log.trace("%s <-- %s: XMIT(%s%s)", local_addr, xmit_requester, original_sender, missing_msgs); + if(is_trace) + log.trace("%s <-- %s: XMIT(%s%s)", local_addr, xmit_requester, original_sender, missing_msgs); if(stats) xmit_reqs_received.add(missing_msgs.size()); @@ -975,6 +986,8 @@ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Add return; } + if(is_trace) + log.trace("%s --> [all]: resending to %s %s", local_addr, original_sender, missing_msgs); for(long i: missing_msgs) { Message msg=buf.get(i); if(msg == null) { @@ -982,8 +995,6 @@ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Add log.warn(Util.getMessage("MessageNotFound"), local_addr, original_sender, i); continue; } - if(is_trace) - log.trace("%s --> [all]: resending %s#%d", local_addr, original_sender, i); sendXmitRsp(xmit_requester, msg); } } @@ -1070,6 +1081,9 @@ protected void sendXmitRsp(Address dest, Message msg) { msg.setSrc(local_addr); if(use_mcast_xmit) { // we simply send the original multicast message + // we modify the original message (instead of copying it) by setting flag DONT_BLOCK: this is fine because + // the original sender will send the message without this flag; only retransmissions will carry the flag + msg.setFlag(DONT_BLOCK); resend(msg); return; } @@ -1477,7 +1491,8 @@ protected void retransmit(SeqnoList missing_msgs, final Address sender, boolean Message retransmit_msg=new ObjectMessage(dest, missing_msgs).setFlag(OOB, NO_FC).setFlag(DONT_BLOCK) .putHeader(this.id, NakAckHeader2.createXmitRequestHeader(sender)); - log.trace("%s --> %s: XMIT_REQ(%s)", local_addr, dest, missing_msgs); + if(is_trace) + log.trace("%s --> %s: XMIT_REQ(%s)", local_addr, dest, missing_msgs); down_prot.down(retransmit_msg); if(stats) xmit_reqs_sent.add(missing_msgs.size()); @@ -1513,7 +1528,6 @@ protected void stopRetransmitTask() { } - /** * Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and * sends retransmit request to all members from which we have missing messages diff --git a/tests/junit-functional/org/jgroups/tests/TableTest.java b/tests/junit-functional/org/jgroups/tests/TableTest.java index 79b85726ba6..a85728b045f 100644 --- a/tests/junit-functional/org/jgroups/tests/TableTest.java +++ b/tests/junit-functional/org/jgroups/tests/TableTest.java @@ -1,9 +1,6 @@ package org.jgroups.tests; -import org.jgroups.BytesMessage; -import org.jgroups.EmptyMessage; -import org.jgroups.Global; -import org.jgroups.Message; +import org.jgroups.*; import org.jgroups.conf.ClassConfigurator; import org.jgroups.protocols.pbcast.NAKACK2; import org.jgroups.protocols.pbcast.NakAckHeader2; @@ -854,10 +851,22 @@ public void testRemoveManyIntoMessageBatch() { assert result == null; } + public void testRemoveManyWithMaxBatchSize() { + Table table=new Table<>(); + IntStream.rangeClosed(1, 1024).forEach(n -> table.add(n,new ObjectMessage(null, "hello-" + n))); + assert table.size() == 1024; + assert table.getNumMissing() == 0; + + MessageBatch batch=new MessageBatch(128); + Supplier batch_creator=() -> batch; + BiConsumer accumulator=MessageBatch::add; + table.removeMany(true, 512, null, batch_creator, accumulator); + assert table.size() == 512; + } public void testForEach() { class MyVisitor implements Table.Visitor { - List list=new ArrayList<>(20); + final List list=new ArrayList<>(20); public boolean visit(long seqno, T element, int row, int column) { System.out.println("#" + seqno + ": " + element + ", row=" + row + ", column=" + column); list.add(new int[]{row,column});