Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMQ-8463] Add advancedMessageStatistics feature #1329

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,59 @@ public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return destination.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public long getEnqueuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue();
}

@Override
public String getEnqueuedMessageClientId() {
return destination.getDestinationStatistics().getEnqueuedMessageClientID().getValue();
}

@Override
public String getEnqueuedMessageId() {
return destination.getDestinationStatistics().getEnqueuedMessageID().getValue();
}

@Override
public long getEnqueuedMessageTimestamp() {
return destination.getDestinationStatistics().getEnqueuedMessageTimestamp().getValue();
}

@Override
public long getDequeuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue();
}

@Override
public long getDequeuedMessageBrokerOutTime() {
return destination.getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue();
}

@Override
public String getDequeuedMessageClientId() {
return destination.getDestinationStatistics().getDequeuedMessageClientID().getValue();
}

@Override
public String getDequeuedMessageId() {
return destination.getDestinationStatistics().getDequeuedMessageID().getValue();
}

@Override
public long getDequeuedMessageTimestamp() {
return destination.getDestinationStatistics().getDequeuedMessageTimestamp().getValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -493,4 +493,37 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop

@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();

@MBeanInfo("Query Advanced Message Statistics flag")
boolean isAdvancedMessageStatisticsEnabled();

@MBeanInfo("Toggle Advanced Message Statistics flag")
void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

@MBeanInfo("Broker in time (ms) of last enqueued message to the destination")
long getEnqueuedMessageBrokerInTime();

@MBeanInfo("ClientID of last enqueued message to the destination")
String getEnqueuedMessageClientId();

@MBeanInfo("MessageID of last enqueued message to the destination")
String getEnqueuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last enqueued message to the destination")
long getEnqueuedMessageTimestamp();

@MBeanInfo("Broker in time (ms) of last dequeued message to the destination")
long getDequeuedMessageBrokerInTime();

@MBeanInfo("Broker out time (ms) of last dequeued message to the destination")
long getDequeuedMessageBrokerOutTime();

@MBeanInfo("ClientID of last dequeued message to the destination")
String getDequeuedMessageClientId();

@MBeanInfo("MessageID of last dequeued message to the destination")
String getDequeuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last dequeued message to the destination")
long getDequeuedMessageTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public abstract class BaseDestination implements Destination {
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
private boolean advancedMessageStatisticsEnabled = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
Expand Down Expand Up @@ -880,6 +881,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
}

@Override
public abstract List<Subscription> getConsumers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,9 @@ public interface Destination extends Service, Task, Message.MessageDestination {

void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);

// [AMQ-8463]
boolean isAdvancedMessageStatisticsEnabled();

void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return next.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are optionally enabled
protected LongStatisticImpl enqueuedMessageBrokerInTime;
protected StringStatisticImpl enqueuedMessageClientID;
protected StringStatisticImpl enqueuedMessageID;
protected LongStatisticImpl enqueuedMessageTimestamp;
protected LongStatisticImpl dequeuedMessageBrokerInTime;
protected LongStatisticImpl dequeuedMessageBrokerOutTime;
protected StringStatisticImpl dequeuedMessageClientID;
protected StringStatisticImpl dequeuedMessageID;
protected LongStatisticImpl dequeuedMessageTimestamp;

public DestinationStatistics() {

enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
Expand All @@ -76,6 +87,17 @@ public DestinationStatistics() {
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

enqueuedMessageBrokerInTime = new LongStatisticImpl("enqueuedMessageBrokerInTime", "Broker in time (ms) of last enqueued message to the destination");
enqueuedMessageClientID = new StringStatisticImpl("enqueuedMessageClientID", "ClientID of last enqueued message to the destination");
enqueuedMessageID = new StringStatisticImpl("enqueuedMessageID", "MessageID of last enqueued message to the destination");
enqueuedMessageTimestamp = new LongStatisticImpl("enqueuedMessageTimestamp", "Message timestamp of last enqueued message to the destination");

dequeuedMessageBrokerInTime = new LongStatisticImpl("dequeuedMessageBrokerInTime", "Broker in time (ms) of last dequeued message to the destination");
dequeuedMessageBrokerOutTime = new LongStatisticImpl("dequeuedMessageBrokerOutTime", "Broker out time (ms) of last dequeued message to the destination");
dequeuedMessageClientID = new StringStatisticImpl("dequeuedMessageClientID", "ClientID of last dequeued message to the destination");
dequeuedMessageID = new StringStatisticImpl("dequeuedMessageID", "MessageID of last dequeued message to the destination");
dequeuedMessageTimestamp = new LongStatisticImpl("dequeuedMessageTimestamp", "Message timestamp of last dequeued message to the destination");

addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
Expand All @@ -94,6 +116,16 @@ public DestinationStatistics() {

addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);

addStatistic("enqueuedMessageBrokerInTime", enqueuedMessageBrokerInTime);
addStatistic("enqueuedMessageClientID", enqueuedMessageClientID);
addStatistic("enqueuedMessageID", enqueuedMessageID);
addStatistic("enqueuedMessageTimestamp", enqueuedMessageTimestamp);
addStatistic("dequeuedMessageBrokerInTime", dequeuedMessageBrokerInTime);
addStatistic("dequeuedMessageBrokerOutTime", dequeuedMessageBrokerOutTime);
addStatistic("dequeuedMessageClientID", dequeuedMessageClientID);
addStatistic("dequeuedMessageID", dequeuedMessageID);
addStatistic("dequeuedMessageTimestamp", dequeuedMessageTimestamp);
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -170,6 +202,42 @@ public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}

public LongStatisticImpl getEnqueuedMessageBrokerInTime() {
return enqueuedMessageBrokerInTime;
}

public StringStatisticImpl getEnqueuedMessageClientID() {
return enqueuedMessageClientID;
}

public StringStatisticImpl getEnqueuedMessageID() {
return enqueuedMessageID;
}

public LongStatisticImpl getEnqueuedMessageTimestamp() {
return enqueuedMessageTimestamp;
}

public LongStatisticImpl getDequeuedMessageBrokerInTime() {
return dequeuedMessageBrokerInTime;
}

public LongStatisticImpl getDequeuedMessageBrokerOutTime() {
return dequeuedMessageBrokerOutTime;
}

public StringStatisticImpl getDequeuedMessageClientID() {
return dequeuedMessageClientID;
}

public StringStatisticImpl getDequeuedMessageID() {
return dequeuedMessageID;
}

public LongStatisticImpl getDequeuedMessageTimestamp() {
return dequeuedMessageTimestamp;
}

public void reset() {
if (this.isDoReset()) {
super.reset();
Expand All @@ -186,6 +254,15 @@ public void reset() {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
enqueuedMessageBrokerInTime.reset();
enqueuedMessageClientID.reset();
enqueuedMessageID.reset();
enqueuedMessageTimestamp.reset();
dequeuedMessageBrokerInTime.reset();
dequeuedMessageBrokerOutTime.reset();
dequeuedMessageClientID.reset();
dequeuedMessageID.reset();
dequeuedMessageTimestamp.reset();
}
}

Expand All @@ -208,9 +285,21 @@ public void setEnabled(boolean enabled) {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);

// [AMQ-9437] Advanced Statistics
// [AMQ-9437] Advanced Network Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
enqueuedMessageBrokerInTime.setEnabled(enabled);
enqueuedMessageClientID.setEnabled(enabled);
enqueuedMessageID.setEnabled(enabled);
enqueuedMessageTimestamp.setEnabled(enabled);
dequeuedMessageBrokerInTime.setEnabled(enabled);
dequeuedMessageBrokerOutTime.setEnabled(enabled);
dequeuedMessageClientID.setEnabled(enabled);
dequeuedMessageID.setEnabled(enabled);
dequeuedMessageTimestamp.setEnabled(enabled);

}

public void setParent(DestinationStatistics parent) {
Expand All @@ -233,6 +322,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
// [AMQ-9437] Advanced Message Statistics do not parent.
} else {
enqueues.setParent(null);
dispatched.setParent(null);
Expand All @@ -252,6 +342,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
// [AMQ-9437] Advanced Message Statistics do not parent.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,13 @@ public void afterRollback() throws Exception {
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(node.getMessage().getBrokerInTime());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerOutTime().setValue(node.getMessage().getBrokerOutTime());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageID().setValue(node.getMessageId().toString());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageTimestamp().setValue(node.getMessage().getTimestamp());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,14 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();

if(isAdvancedMessageStatisticsEnabled()) {
getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(reference.getMessage().getBrokerInTime());
getDestinationStatistics().getDequeuedMessageBrokerOutTime().setValue(reference.getMessage().getBrokerOutTime());
getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
getDestinationStatistics().getDequeuedMessageID().setValue(reference.getMessageId().toString());
getDestinationStatistics().getDequeuedMessageTimestamp().setValue(reference.getMessage().getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
getDestinationStatistics().getNetworkDequeues().increment();
}
Expand Down Expand Up @@ -1975,6 +1983,13 @@ final void messageSent(final ConnectionContext context, final Message msg) throw
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());

if(isAdvancedMessageStatisticsEnabled()) {
destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(msg.getBrokerInTime());
destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId());
destinationStatistics.getEnqueuedMessageID().setValue(msg.getMessageId().toString());
destinationStatistics.getEnqueuedMessageTimestamp().setValue(msg.getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,13 @@ protected void dispatch(final ConnectionContext context, Message message) throws
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();

if(isAdvancedMessageStatisticsEnabled()) {
destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(message.getBrokerInTime());
destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId());
destinationStatistics.getEnqueuedMessageID().setValue(message.getMessageId().toString());
destinationStatistics.getEnqueuedMessageTimestamp().setValue(message.getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck
destination.getDestinationStatistics().getNetworkDequeues().add(count);
}
}
if(destination.isAdvancedMessageStatisticsEnabled()) {
destination.getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
destination.getDestinationStatistics().getDequeuedMessageID().setValue(ack.getLastMessageId().toString());
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
}
Expand Down
Loading