Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Added MsgStatsTest
  • Loading branch information
belaban committed Apr 15, 2024
1 parent 3c9a756 commit d31ce13
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 215 deletions.
2 changes: 0 additions & 2 deletions src/org/jgroups/protocols/AlternatingBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ protected void _sendBundledMessages() {
else {
avg_batch_size.add(target_list.size());
sendMessageList(target_dest, target_list.get(0).getSrc(), target_list);
if(transport.statsEnabled())
transport.getMessageStats().incrNumBatchesSent(1);
}
}
finally {
Expand Down
6 changes: 2 additions & 4 deletions src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ public int getQueueSize() {
else {
Address dst=entry.getKey();
sendMessageList(dst, list.get(0).getSrc(), list);
if(transport.statsEnabled())
transport.getMessageStats().incrNumBatchesSent(1);
}
list.clear();
}
Expand All @@ -125,8 +123,7 @@ protected void sendSingleMessage(final Message msg) {
try {
Util.writeMessage(msg, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled())
transport.getMessageStats().incrNumSingleMsgsSent(1);
transport.getMessageStats().incrNumSingleMsgsSent();
}
catch(Throwable e) {
log.trace(Util.getMessage("SendFailure"),
Expand All @@ -140,6 +137,7 @@ protected void sendMessageList(final Address dest, final Address src, final List
try {
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId());
transport.doSend(output.buffer(), 0, output.position(), dest);
transport.getMessageStats().incrNumBatchesSent();
}
catch(Throwable e) {
log.trace(Util.getMessage("FailureSendingMsgBundle"), transport.getAddress(), e);
Expand Down
214 changes: 136 additions & 78 deletions src/org/jgroups/protocols/MsgStats.java
Original file line number Diff line number Diff line change
@@ -1,128 +1,179 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.conf.AttributeType;
import org.jgroups.annotations.Property;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;

import static org.jgroups.conf.AttributeType.BYTES;
import static org.jgroups.conf.AttributeType.SCALAR;

/**
* Class which has all the stats about received/sent messages etc (in TP)
* Class which has all the stats about received/sent messages etc. (in TP)
* @author Bela Ban
* @since 4.0
*/
public class MsgStats {
protected final AtomicLong num_msgs_sent=new AtomicLong();
protected final AtomicLong num_msgs_received=new AtomicLong();
@Property(description="Whether metrics should be logged")
protected boolean enabled=true;

@ManagedAttribute(description="Number of unicast messages sent",type=SCALAR)
protected final LongAdder num_ucasts_sent=new LongAdder();
@ManagedAttribute(description="Number of unicast messages received",type=SCALAR)
protected final LongAdder num_ucasts_received=new LongAdder();

@ManagedAttribute(description="Number of multicast messages sent",type=SCALAR)
protected final LongAdder num_mcasts_sent=new LongAdder();
@ManagedAttribute(description="Total number of multicast messages received",type=SCALAR)
protected final LongAdder num_mcasts_received=new LongAdder();

@ManagedAttribute(description="Number of single messages sent (by the bundler)",type=SCALAR)
protected final LongAdder num_single_msgs_sent=new LongAdder();
@ManagedAttribute(description="Number of single messages received (by the transport)",type=SCALAR)
protected final LongAdder num_single_msgs_received=new LongAdder();

@ManagedAttribute(description="Number of batches sent (by the bundler)",type=SCALAR)
protected final LongAdder num_batches_sent=new LongAdder();
@ManagedAttribute(description="Number of batches received (by the transport)",type=SCALAR)
protected final LongAdder num_batches_received=new LongAdder();

protected final AtomicLong num_ucasts_sent=new AtomicLong();
protected final AtomicLong num_mcasts_sent=new AtomicLong();
/** The average number of messages in a received {@link MessageBatch} */
protected final AverageMinMax avg_batch_size=new AverageMinMax();

protected final AtomicLong num_ucasts_received=new AtomicLong();
protected final AtomicLong num_mcasts_received=new AtomicLong();
@ManagedAttribute(description="Number of messages rejected by the thread pool (because it was full)",type=SCALAR)
protected final LongAdder num_rejected_msgs=new LongAdder();

protected final AtomicLong num_bytes_sent=new AtomicLong();
protected final AtomicLong num_bytes_received=new AtomicLong();
@ManagedAttribute(description="Number of multicast bytes sent",type=BYTES)
protected final LongAdder num_mcast_bytes_sent=new LongAdder();
@ManagedAttribute(description="Number of multicast bytes received",type=BYTES)
protected final LongAdder num_mcast_bytes_received=new LongAdder();

protected final AtomicLong num_ucast_bytes_sent=new AtomicLong();
protected final AtomicLong num_mcast_bytes_sent=new AtomicLong();
@ManagedAttribute(description="Number of unicast bytes sent",type=BYTES)
protected final LongAdder num_ucast_bytes_sent=new LongAdder();
@ManagedAttribute(description="Number of unicast bytes received",type=BYTES)
protected final LongAdder num_ucast_bytes_received=new LongAdder();

protected final AtomicLong num_ucast_bytes_received=new AtomicLong();
protected final AtomicLong num_mcast_bytes_received=new AtomicLong();
public boolean enabled() {return enabled;}
public MsgStats enable(boolean b) {enabled=b; return this;}

protected final AtomicLong num_oob_msgs_received=new AtomicLong();
@ManagedAttribute(description="Number of messages sent (mcasts and ucasts sent)",type=SCALAR)
public long getNumMsgsSent() {return num_mcasts_sent.sum() + num_ucasts_sent.sum();}

protected final AtomicLong num_single_msgs_sent=new AtomicLong();
protected final AtomicLong num_batches_sent=new AtomicLong();
protected final AtomicLong num_batches_received=new AtomicLong();
@ManagedAttribute(description="Number of messages received (mcasts and ucasts received)",type=SCALAR)
public long getNumMsgsReceived() {return num_mcasts_received.sum() + num_ucasts_received.sum();}

protected final AtomicInteger num_rejected_msgs=new AtomicInteger(0);
@ManagedAttribute(description="Returns the average batch size of received batches")
public String getAvgBatchSize() {return avg_batch_size.toString();}

public AverageMinMax avgBatchSize() {return avg_batch_size;}

@ManagedAttribute(description="Number of messages sent",type=AttributeType.SCALAR)
public long getNumMsgsSent() {return num_msgs_sent.get();}
public MsgStats incrNumMsgsSent(int d) {num_msgs_sent.addAndGet(d); return this;}
@ManagedAttribute(description="Total number of bytes sent (unicast + multicast bytes)",type=BYTES)
public long getNumBytesSent() {return num_mcast_bytes_sent.sum() + num_ucast_bytes_sent.sum();}

@ManagedAttribute(description="Number of unicast messages sent",type=AttributeType.SCALAR)
public long getNumUcastMsgsSent() {return num_ucasts_sent.get();}
public MsgStats incrNumUcastMsgsSent(int d) {num_ucasts_sent.addAndGet(d); return this;}
@ManagedAttribute(description="Total number of bytes received (unicast + multicast)",type=BYTES)
public long getNumBytesReceived() {return num_mcast_bytes_received.sum() + num_ucast_bytes_received.sum();}

@ManagedAttribute(description="Number of multicast messages sent",type=AttributeType.SCALAR)
public long getNumMcastMsgsSent() {return num_mcasts_sent.get();}
public MsgStats incrNumMcastMsgsSent(int d) {num_mcasts_sent.addAndGet(d); return this;}
public long getNumUcastsSent() {return num_ucasts_sent.sum();}

@ManagedAttribute(description="Number of unicast messages received",type=AttributeType.SCALAR)
public long getNumUcastMsgsReceived() {return num_ucasts_received.get();}
public MsgStats incrNumUcastMsgsReceived(int d) {num_ucasts_received.addAndGet(d); return this;}
public long getNumMcastsSent() {return num_mcasts_sent.sum();}

@ManagedAttribute(description="Number of multicast messages received",type=AttributeType.SCALAR)
public long getNumMcastMsgsReceived() {return num_mcasts_received.get();}
public MsgStats incrNumMcastMsgsReceived(int d) {num_mcasts_received.addAndGet(d); return this;}
public long getNumUcastsReceived() {return num_ucasts_received.sum();}

@ManagedAttribute(description="Number of regular messages received",type=AttributeType.SCALAR)
public long getNumMsgsReceived() {return num_msgs_received.get();}
public MsgStats incrNumMsgsReceived(int d) {num_msgs_received.addAndGet(d); return this;}
public long getNumMcastsReceived() {return num_mcasts_received.sum();}

@ManagedAttribute(description="Number of OOB messages received. This value is included in num_msgs_received."
,type=AttributeType.SCALAR)
public long getNumOOBMsgsReceived() {return num_oob_msgs_received.get();}
public MsgStats incrNumOOBMsgsReceived(int d) {num_oob_msgs_received.addAndGet(d); return this;}
public long getNumSingleMsgsSent() {return num_single_msgs_sent.sum();}
public MsgStats incrNumSingleMsgsSent() {num_single_msgs_sent.increment(); return this;}

@ManagedAttribute(description="Number of single messages sent",type=AttributeType.SCALAR)
public long getNumSingleMsgsSent() {return num_single_msgs_sent.get();}
public MsgStats incrNumSingleMsgsSent(int d) {num_single_msgs_sent.addAndGet(d); return this;}
public long getNumBatchesSent() {return num_batches_sent.sum();}
public MsgStats incrNumBatchesSent() {num_batches_sent.increment(); return this;}
public MsgStats incrNumBatchesSent(int n) {num_batches_sent.add(n); return this;}

@ManagedAttribute(description="Number of message batches sent",type=AttributeType.SCALAR)
public long getNumBatchesSent() {return num_batches_sent.get();}
public MsgStats incrNumBatchesSent(int d) {num_batches_sent.addAndGet(d); return this;}
public long getNumBatchesReceived() {return num_batches_received.sum();}

@ManagedAttribute(description="Number of message batches received",type=AttributeType.SCALAR)
public long getNumBatchesReceived() {return num_batches_received.get();}
public MsgStats incrNumBatchesReceived(int d) {num_batches_received.addAndGet(d); return this;}
public long getNumUcastBytesSent() {return num_ucast_bytes_sent.sum();}

@ManagedAttribute(description="Number of bytes sent",type=AttributeType.BYTES)
public long getNumBytesSent() {return num_bytes_sent.get();}
public MsgStats incrNumBytesSent(int d) {num_bytes_sent.addAndGet(d); return this;}
public long getNumMcastBytesSent() {return num_mcast_bytes_sent.sum();}

@ManagedAttribute(description="Number of unicast bytes sent",type=AttributeType.BYTES)
public long getNumUcastBytesSent() {return num_ucast_bytes_sent.get();}
public MsgStats incrNumUcastBytesSent(int d) {num_ucast_bytes_sent.addAndGet(d); return this;}
public long getNumUcastBytesReceived() {return num_ucast_bytes_received.sum();}

@ManagedAttribute(description="Number of multicast bytes sent",type=AttributeType.BYTES)
public long getNumMcastBytesSent() {return num_mcast_bytes_sent.get();}
public MsgStats incrNumMcastBytesSent(int d) {num_mcast_bytes_sent.addAndGet(d); return this;}
public long getNumMcastBytesReceived() {return num_mcast_bytes_received.sum();}

@ManagedAttribute(description="Number of bytes received",type=AttributeType.BYTES)
public long getNumBytesReceived() {return num_bytes_received.get();}
public MsgStats incrNumBytesReceived(int d) {num_bytes_received.addAndGet(d); return this;}
public long getNumRejectedMsgs() {return num_rejected_msgs.sum();}
public MsgStats incrNumRejectedMsgs() {num_rejected_msgs.increment(); return this;}

@ManagedAttribute(description="Number of unicast bytes received",type=AttributeType.BYTES)
public long getNumUcastBytesReceived() {return num_ucast_bytes_received.get();}
public MsgStats incrNumUcastBytesReceived(int d) {num_ucast_bytes_received.addAndGet(d); return this;}

@ManagedAttribute(description="Number of multicast bytes received",type=AttributeType.BYTES)
public long getNumMcastBytesReceived() {return num_mcast_bytes_received.get();}
public MsgStats incrNumMcastBytesReceived(int d) {num_mcast_bytes_received.addAndGet(d); return this;}
public MsgStats sent(Address dest, int length) {
if(!enabled)
return this;
if(dest == null) // multicast
return add(num_mcasts_sent, 1, num_mcast_bytes_sent, length);
return add(num_ucasts_sent, 1, num_ucast_bytes_sent, length);
}

@ManagedAttribute(description="Number of dropped messages that were rejected by the thread pool"
,type=AttributeType.SCALAR)
public int getNumRejectedMsgs() {return num_rejected_msgs.get();}
public MsgStats incrNumRejectedMsgs(int d) {num_rejected_msgs.addAndGet(d); return this;}
public MsgStats sent(Message msg) {
return (msg == null || !enabled)? this : sent(msg.dest(), msg.getLength());
}

public MsgStats received(Address dest, int length) {
if(!enabled)
return this;
if(dest == null)
return add(num_mcasts_received, 1, num_mcast_bytes_received, length);
return add(num_ucasts_received, 1, num_ucast_bytes_received, length);
}

public MsgStats received(Message msg) {
if(msg == null || !enabled)
return this;
num_single_msgs_received.increment();
return received(msg.dest(), msg.getLength());
}

public MsgStats received(MessageBatch batch) {
if(batch == null || !enabled)
return this;
num_batches_received.increment();
int num_msgs=batch.size();
int length=batch.length();
avg_batch_size.add(num_msgs);
if(batch.dest() == null)
return add(num_mcasts_received, num_msgs, num_mcast_bytes_received, length);
return add(num_ucasts_received, num_msgs, num_ucast_bytes_received, length);
}


public MsgStats reset() {
Stream.of(num_msgs_sent, num_msgs_received, num_single_msgs_sent, num_oob_msgs_received,
num_batches_sent, num_batches_received, num_bytes_sent,num_bytes_received)
.forEach(al -> al.set(0));
Stream.of(num_rejected_msgs).forEach(ai -> ai.set(0));
Stream.of(num_ucasts_sent, num_ucasts_received,
num_mcasts_sent, num_mcasts_received,
num_single_msgs_sent, num_single_msgs_received,
num_batches_sent, num_batches_received,
num_rejected_msgs,
num_mcast_bytes_sent, num_mcast_bytes_received,
num_ucast_bytes_sent, num_ucast_bytes_received)
.forEach(LongAdder::reset);
avg_batch_size.clear();
return this;
}

@Override
public String toString() {
return toString(false);
}

public String toString(boolean details) {
StringBuilder sb=new StringBuilder();
if(!details) {
sb.append(String.format("%,d sent (%s) %,d received (%s)", getNumMsgsSent(), Util.printBytes(getNumBytesSent()),
getNumMsgsReceived(), Util.printBytes(getNumBytesReceived())));
return sb.toString();
}
Field[] fields=MsgStats.class.getDeclaredFields();
for(Field field: fields) {
try {
Expand All @@ -135,4 +186,11 @@ public String toString() {
}
return sb.toString();
}

protected MsgStats add(LongAdder msgs, int num_msgs, LongAdder bytes, int length) {
msgs.add(num_msgs);
if(length > 0)
bytes.add(length);
return this;
}
}
3 changes: 1 addition & 2 deletions src/org/jgroups/protocols/NoBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ protected void sendSingleMessage(final Message msg, final ByteArrayDataOutputStr
output.position(0);
Util.writeMessage(msg, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled())
transport.getMessageStats().incrNumSingleMsgsSent(1);
transport.getMessageStats().incrNumSingleMsgsSent();
}

}
6 changes: 2 additions & 4 deletions src/org/jgroups/protocols/PerDestinationBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ protected void sendSingleMessage(final Address dest, final Message msg) {
output.position(0);
Util.writeMessage(msg, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled())
transport.getMessageStats().incrNumSingleMsgsSent(1);
transport.getMessageStats().incrNumSingleMsgsSent();
num_single_msgs_sent.increment();
}
catch(Throwable e) {
Expand All @@ -252,8 +251,7 @@ protected void sendMessageList(final Address dest, final Address src, final Fast
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list,
output, dest == null, transport.getId());
transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled())
transport.getMessageStats().incrNumBatchesSent(1);
transport.getMessageStats().incrNumBatchesSent();
num_batches_sent.increment();
}
catch(Throwable e) {
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ public void sendBundledMessages(final Message[] buf, final int read_index, final
output.position(current_pos);
}
transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled())
transport.getMessageStats().incrNumBatchesSent(num_msgs);
transport.getMessageStats().incrNumBatchesSent(num_msgs);
}
catch(Exception ex) {
log.trace("failed to send message(s) to %s: %s", dest == null? "group" : dest, ex.getMessage());
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ protected int sendBundledMessages(final Message[] buf, final int read_index, int
output.position(current_pos);
}
transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled())
transport.getMessageStats().incrNumBatchesSent(num_msgs);
transport.getMessageStats().incrNumBatchesSent(num_msgs);
}
catch(Exception ex) {
log.trace("failed to send message(s)", ex);
Expand Down
Loading

0 comments on commit d31ce13

Please sign in to comment.