Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for Cassandra 3.x version #1

Merged
merged 2 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 12 additions & 121 deletions src/main/java/com/pixonic/ctop/Main.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.pixonic.ctop;

import javax.management.*;
import com.pixonic.ctop.metrics.Metrics;
import com.pixonic.ctop.metrics.MetricsFactory;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class Main {
private static volatile boolean shutdown = false;

public static void main(String[] args) throws Exception {
if (args.length == 0) {
Expand All @@ -24,131 +24,22 @@ public static void main(String[] args) throws Exception {
interval = Integer.parseInt(args[2]);
}

Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown = true));

System.out.println("Connecting to " + hostAndPort + "...");

final JMXServiceURL target = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + hostAndPort + "/jmxrmi");

final JMXConnector connector = JMXConnectorFactory.connect(target);

final MBeanServerConnection remote = connector.getMBeanServerConnection();

ObjectName objectName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,keyspace=" + keySpace + ",columnfamily=*");

List<MonitoringEntry> items = new LinkedList<>();
for (ObjectName mbean : remote.queryNames(objectName, null)) {
AttributeList counts = remote.getAttributes(mbean, new String[] {"ReadCount", "WriteCount"});
long readCount = (Long) (((Attribute) counts.get(0)).getValue());
long writeCount = (Long) (((Attribute) counts.get(1)).getValue());
if (readCount > 0 || writeCount > 0) items.add(new MonitoringEntry(mbean, readCount, writeCount));
}
ObjectName storageMBean = new ObjectName("org.apache.cassandra.db:type=StorageService");
String releaseVersion = (String) remote.getAttribute(storageMBean, "ReleaseVersion");
int majorVersion = Integer.valueOf(releaseVersion.substring(0, releaseVersion.indexOf('.')));
System.out.println("Cassandra version is " + releaseVersion);

Metrics metrics = MetricsFactory.getMetrics(majorVersion, interval, remote, keySpace);
System.out.println("Connected. Gathering data...");
metrics.printMetrics();

while (!shutdown) {
Thread.sleep(TimeUnit.SECONDS.toMillis(interval));
Set<ResultItem> readResult = new TreeSet<>();
Set<ResultItem> writeResult = new TreeSet<>();
for (MonitoringEntry item : items) {
MonitoringEntry resultItem = calculateDifference(remote, item);
if (resultItem.readCount > 0) readResult.add(new ResultItem(item.cf, resultItem.readCount));
if (resultItem.writeCount > 0) writeResult.add(new ResultItem(item.cf, resultItem.writeCount));
}

//clear console
System.out.print("\033[H\033[2J");
System.out.flush();

System.out.println("Cassandra top v0.1");
System.out.println();
System.out.println(new Date() + " / " + interval + "s");
System.out.println();

int width = SttySupport.getTerminalWidth();
int height = SttySupport.getTerminalHeight();

int posWrite = width / 2;
String leftStr = "Reads", rightStr = "Writes";

System.out.println(makeLine(leftStr, rightStr, posWrite));
System.out.println();
Iterator<ResultItem> readIt = readResult.iterator();
Iterator<ResultItem> writeIt = writeResult.iterator();
Long maxReadCount = null, maxWriteCount = null;
for(int i = 7; i < height; i++) {
if (readIt.hasNext()) {
ResultItem resultItem = readIt.next();
if (maxReadCount == null) maxReadCount = resultItem.count;
leftStr = formatCounter(resultItem, maxReadCount);
} else {
leftStr = "";
}
if (writeIt.hasNext()) {
ResultItem resultItem = writeIt.next();
if (maxWriteCount == null) maxWriteCount = resultItem.count;
rightStr = formatCounter(resultItem, maxWriteCount);
} else {
rightStr = "";
}
if (leftStr.length() == 0 && rightStr.length() == 0) break;
System.out.println(makeLine(leftStr, rightStr, posWrite));
}
}
}

private static String formatCounter(ResultItem resultItem, long maxCount) {
int maxLen = String.valueOf(maxCount).length();
return StringUtils.leftPad(String.valueOf(resultItem.count), maxLen + 1) + " " + resultItem;
}

private static String makeLine(String left, String right, int rightPos) {
if (left.length() > rightPos) left = left.substring(0, rightPos);
if (right.length() > rightPos) left = right.substring(0, rightPos);
return StringUtils.rightPad(left, rightPos) + " " + right;
Runtime.getRuntime().addShutdownHook(new Thread(metrics::shutdown));
}

private static MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException {
AttributeList counts = remote.getAttributes(item.cf, new String[] {"ReadCount", "WriteCount"});
long readCount = (Long) (((Attribute) counts.get(0)).getValue());
long writeCount = (Long) (((Attribute) counts.get(1)).getValue());

long readDf = readCount - item.readCount;
long writeDf = writeCount - item.writeCount;

item.readCount = readCount;
item.writeCount = writeCount;
return new MonitoringEntry(item.cf, readDf, writeDf);
}

private static class MonitoringEntry {
private final ObjectName cf;
private long readCount;
private long writeCount;

public MonitoringEntry(ObjectName cf, long readCount, long writeCount) {
this.cf = cf;
this.readCount = readCount;
this.writeCount = writeCount;
}
}

private static class ResultItem implements Comparable<ResultItem> {
private final ObjectName cf;
private final long count;

public ResultItem(ObjectName cf, long count) {
this.cf = cf;
this.count = count;
}

@Override public int compareTo(ResultItem o) {
long d = count - o.count;
return -(d < 0 ? -1 : (d > 0 ? 1 : 0));
}

@Override public String toString() {
return cf.getKeyProperty("columnfamily");
}
}
}
80 changes: 80 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/AbstractMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.pixonic.ctop.metrics;

import com.pixonic.ctop.StringUtils;
import com.pixonic.ctop.SttySupport;

import javax.management.MBeanServerConnection;
import java.util.Date;
import java.util.Iterator;
import java.util.NavigableSet;

public abstract class AbstractMetrics implements Metrics {

protected volatile boolean shutdown = false;
protected final long interval;
protected final MBeanServerConnection remote;
protected final String keySpace;

AbstractMetrics(long interval, MBeanServerConnection remote, String keySpace) {
this.interval = interval;
this.remote = remote;
this.keySpace = keySpace;
}

void printMetrics(NavigableSet<ResultItem> readResult, NavigableSet<ResultItem> writeResult) {
//clear console
System.out.print("\033[H\033[2J");
System.out.flush();

System.out.println("Cassandra top v0.2");
System.out.println();
System.out.println(new Date() + " / " + interval + "s");
System.out.println();

int width = SttySupport.getTerminalWidth();
int height = SttySupport.getTerminalHeight();

int posWrite = width / 2;
String leftStr = "Reads", rightStr = "Writes";

System.out.println(makeLine(leftStr, rightStr, posWrite));
System.out.println();
Iterator<ResultItem> readIt = readResult.iterator();
Iterator<ResultItem> writeIt = writeResult.iterator();
Long maxReadCount = null, maxWriteCount = null;
for(int i = 7; i < height; i++) {
if (readIt.hasNext()) {
ResultItem resultItem = readIt.next();
if (maxReadCount == null) maxReadCount = resultItem.count;
leftStr = formatCounter(resultItem, maxReadCount);
} else {
leftStr = "";
}
if (writeIt.hasNext()) {
ResultItem resultItem = writeIt.next();
if (maxWriteCount == null) maxWriteCount = resultItem.count;
rightStr = formatCounter(resultItem, maxWriteCount);
} else {
rightStr = "";
}
if (leftStr.length() == 0 && rightStr.length() == 0) break;
System.out.println(makeLine(leftStr, rightStr, posWrite));
}
}

@Override public void shutdown() {
shutdown = true;
}

protected String formatCounter(ResultItem resultItem, long maxCount) {
int maxLen = String.valueOf(maxCount).length();
return StringUtils.leftPad(String.valueOf(resultItem.count), maxLen + 1) + " " + resultItem;
}

protected String makeLine(String left, String right, int rightPos) {
if (left.length() > rightPos) left = left.substring(0, rightPos);
if (right.length() > rightPos) left = right.substring(0, rightPos);
return StringUtils.rightPad(left, rightPos) + " " + right;
}

}
74 changes: 74 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/CurrentMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.pixonic.ctop.metrics;

import javax.management.*;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

public class CurrentMetrics extends AbstractMetrics {

private static final String KEY_PROPERTY = "scope";
private static final String ATTRIBUTE = "Count";

CurrentMetrics(long interval, MBeanServerConnection remote, String keySpace) {
super(interval, remote, keySpace);
}

@Override public void printMetrics() throws Exception {
ObjectName readObjectName = new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=" + keySpace + ",scope=*,name=ReadLatency");
ObjectName writeObjectName = new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=" + keySpace + ",scope=*,name=WriteLatency");

List<MonitoringEntry> readItems = getMonitoringEntryList(remote, readObjectName);
List<MonitoringEntry> writeItems = getMonitoringEntryList(remote, writeObjectName);

while (!shutdown) {
Thread.sleep(TimeUnit.SECONDS.toMillis(interval));
super.printMetrics(createResultItems(readItems), createResultItems(writeItems));
}
}

private NavigableSet<ResultItem> createResultItems(List<MonitoringEntry> monitoringItems) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException {
NavigableSet<ResultItem> resultItems = new TreeSet<>();

for (MonitoringEntry item : monitoringItems) {
MonitoringEntry monitoringEntry = calculateDifference(remote, item);
if (monitoringEntry.count > 0) resultItems.add(new ResultItem(item.objectName, KEY_PROPERTY, monitoringEntry.count));
}

return resultItems;
}

private MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException {
Long count = (Long) remote.getAttribute(item.objectName, ATTRIBUTE);

long diff = count - item.count;

item.count = count;
return new MonitoringEntry(item.objectName, diff);
}

private List<MonitoringEntry> getMonitoringEntryList(MBeanServerConnection remote, ObjectName objectName) throws IOException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException {
List<MonitoringEntry> monitoringList = new LinkedList<>();

for (ObjectName mbean : remote.queryNames(objectName, null)) {
Long count = (Long) remote.getAttribute(mbean, ATTRIBUTE);
if (count > 0) monitoringList.add(new MonitoringEntry(mbean, count));
}

return monitoringList;
}

private static class MonitoringEntry {
private ObjectName objectName;
private long count;

MonitoringEntry(ObjectName objectName, long count) {
this.objectName = objectName;
this.count = count;
}
}

}
9 changes: 9 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/Metrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.pixonic.ctop.metrics;

public interface Metrics {

void printMetrics() throws Exception;

void shutdown();

}
15 changes: 15 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/MetricsFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.pixonic.ctop.metrics;

import javax.management.MBeanServerConnection;

public class MetricsFactory {

public static Metrics getMetrics(int version, long interval, MBeanServerConnection remote, String keySpace) {
if (version >= 3) {
return new CurrentMetrics(interval, remote, keySpace);
} else {
return new OldMetrics(interval, remote, keySpace);
}
}

}
Loading