Skip to content

Commit

Permalink
Implement support for Cassandra 3.x version (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonGabov authored May 11, 2018
1 parent 49a34e9 commit feb4b1b
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 121 deletions.
133 changes: 12 additions & 121 deletions src/main/java/com/pixonic/ctop/Main.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.pixonic.ctop;

import javax.management.*;
import com.pixonic.ctop.metrics.Metrics;
import com.pixonic.ctop.metrics.MetricsFactory;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class Main {
private static volatile boolean shutdown = false;

public static void main(String[] args) throws Exception {
if (args.length == 0) {
Expand All @@ -24,131 +24,22 @@ public static void main(String[] args) throws Exception {
interval = Integer.parseInt(args[2]);
}

Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown = true));

System.out.println("Connecting to " + hostAndPort + "...");

final JMXServiceURL target = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + hostAndPort + "/jmxrmi");

final JMXConnector connector = JMXConnectorFactory.connect(target);

final MBeanServerConnection remote = connector.getMBeanServerConnection();

ObjectName objectName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,keyspace=" + keySpace + ",columnfamily=*");

List<MonitoringEntry> items = new LinkedList<>();
for (ObjectName mbean : remote.queryNames(objectName, null)) {
AttributeList counts = remote.getAttributes(mbean, new String[] {"ReadCount", "WriteCount"});
long readCount = (Long) (((Attribute) counts.get(0)).getValue());
long writeCount = (Long) (((Attribute) counts.get(1)).getValue());
if (readCount > 0 || writeCount > 0) items.add(new MonitoringEntry(mbean, readCount, writeCount));
}
ObjectName storageMBean = new ObjectName("org.apache.cassandra.db:type=StorageService");
String releaseVersion = (String) remote.getAttribute(storageMBean, "ReleaseVersion");
int majorVersion = Integer.valueOf(releaseVersion.substring(0, releaseVersion.indexOf('.')));
System.out.println("Cassandra version is " + releaseVersion);

Metrics metrics = MetricsFactory.getMetrics(majorVersion, interval, remote, keySpace);
System.out.println("Connected. Gathering data...");
metrics.printMetrics();

while (!shutdown) {
Thread.sleep(TimeUnit.SECONDS.toMillis(interval));
Set<ResultItem> readResult = new TreeSet<>();
Set<ResultItem> writeResult = new TreeSet<>();
for (MonitoringEntry item : items) {
MonitoringEntry resultItem = calculateDifference(remote, item);
if (resultItem.readCount > 0) readResult.add(new ResultItem(item.cf, resultItem.readCount));
if (resultItem.writeCount > 0) writeResult.add(new ResultItem(item.cf, resultItem.writeCount));
}

//clear console
System.out.print("\033[H\033[2J");
System.out.flush();

System.out.println("Cassandra top v0.1");
System.out.println();
System.out.println(new Date() + " / " + interval + "s");
System.out.println();

int width = SttySupport.getTerminalWidth();
int height = SttySupport.getTerminalHeight();

int posWrite = width / 2;
String leftStr = "Reads", rightStr = "Writes";

System.out.println(makeLine(leftStr, rightStr, posWrite));
System.out.println();
Iterator<ResultItem> readIt = readResult.iterator();
Iterator<ResultItem> writeIt = writeResult.iterator();
Long maxReadCount = null, maxWriteCount = null;
for(int i = 7; i < height; i++) {
if (readIt.hasNext()) {
ResultItem resultItem = readIt.next();
if (maxReadCount == null) maxReadCount = resultItem.count;
leftStr = formatCounter(resultItem, maxReadCount);
} else {
leftStr = "";
}
if (writeIt.hasNext()) {
ResultItem resultItem = writeIt.next();
if (maxWriteCount == null) maxWriteCount = resultItem.count;
rightStr = formatCounter(resultItem, maxWriteCount);
} else {
rightStr = "";
}
if (leftStr.length() == 0 && rightStr.length() == 0) break;
System.out.println(makeLine(leftStr, rightStr, posWrite));
}
}
}

private static String formatCounter(ResultItem resultItem, long maxCount) {
int maxLen = String.valueOf(maxCount).length();
return StringUtils.leftPad(String.valueOf(resultItem.count), maxLen + 1) + " " + resultItem;
}

private static String makeLine(String left, String right, int rightPos) {
if (left.length() > rightPos) left = left.substring(0, rightPos);
if (right.length() > rightPos) left = right.substring(0, rightPos);
return StringUtils.rightPad(left, rightPos) + " " + right;
Runtime.getRuntime().addShutdownHook(new Thread(metrics::shutdown));
}

private static MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException {
AttributeList counts = remote.getAttributes(item.cf, new String[] {"ReadCount", "WriteCount"});
long readCount = (Long) (((Attribute) counts.get(0)).getValue());
long writeCount = (Long) (((Attribute) counts.get(1)).getValue());

long readDf = readCount - item.readCount;
long writeDf = writeCount - item.writeCount;

item.readCount = readCount;
item.writeCount = writeCount;
return new MonitoringEntry(item.cf, readDf, writeDf);
}

private static class MonitoringEntry {
private final ObjectName cf;
private long readCount;
private long writeCount;

public MonitoringEntry(ObjectName cf, long readCount, long writeCount) {
this.cf = cf;
this.readCount = readCount;
this.writeCount = writeCount;
}
}

private static class ResultItem implements Comparable<ResultItem> {
private final ObjectName cf;
private final long count;

public ResultItem(ObjectName cf, long count) {
this.cf = cf;
this.count = count;
}

@Override public int compareTo(ResultItem o) {
long d = count - o.count;
return -(d < 0 ? -1 : (d > 0 ? 1 : 0));
}

@Override public String toString() {
return cf.getKeyProperty("columnfamily");
}
}
}
80 changes: 80 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/AbstractMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.pixonic.ctop.metrics;

import com.pixonic.ctop.StringUtils;
import com.pixonic.ctop.SttySupport;

import javax.management.MBeanServerConnection;
import java.util.Date;
import java.util.Iterator;
import java.util.NavigableSet;

public abstract class AbstractMetrics implements Metrics {

protected volatile boolean shutdown = false;
protected final long interval;
protected final MBeanServerConnection remote;
protected final String keySpace;

AbstractMetrics(long interval, MBeanServerConnection remote, String keySpace) {
this.interval = interval;
this.remote = remote;
this.keySpace = keySpace;
}

void printMetrics(NavigableSet<ResultItem> readResult, NavigableSet<ResultItem> writeResult) {
//clear console
System.out.print("\033[H\033[2J");
System.out.flush();

System.out.println("Cassandra top v0.2");
System.out.println();
System.out.println(new Date() + " / " + interval + "s");
System.out.println();

int width = SttySupport.getTerminalWidth();
int height = SttySupport.getTerminalHeight();

int posWrite = width / 2;
String leftStr = "Reads", rightStr = "Writes";

System.out.println(makeLine(leftStr, rightStr, posWrite));
System.out.println();
Iterator<ResultItem> readIt = readResult.iterator();
Iterator<ResultItem> writeIt = writeResult.iterator();
Long maxReadCount = null, maxWriteCount = null;
for(int i = 7; i < height; i++) {
if (readIt.hasNext()) {
ResultItem resultItem = readIt.next();
if (maxReadCount == null) maxReadCount = resultItem.count;
leftStr = formatCounter(resultItem, maxReadCount);
} else {
leftStr = "";
}
if (writeIt.hasNext()) {
ResultItem resultItem = writeIt.next();
if (maxWriteCount == null) maxWriteCount = resultItem.count;
rightStr = formatCounter(resultItem, maxWriteCount);
} else {
rightStr = "";
}
if (leftStr.length() == 0 && rightStr.length() == 0) break;
System.out.println(makeLine(leftStr, rightStr, posWrite));
}
}

@Override public void shutdown() {
shutdown = true;
}

protected String formatCounter(ResultItem resultItem, long maxCount) {
int maxLen = String.valueOf(maxCount).length();
return StringUtils.leftPad(String.valueOf(resultItem.count), maxLen + 1) + " " + resultItem;
}

protected String makeLine(String left, String right, int rightPos) {
if (left.length() > rightPos) left = left.substring(0, rightPos);
if (right.length() > rightPos) left = right.substring(0, rightPos);
return StringUtils.rightPad(left, rightPos) + " " + right;
}

}
74 changes: 74 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/CurrentMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.pixonic.ctop.metrics;

import javax.management.*;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

public class CurrentMetrics extends AbstractMetrics {

private static final String KEY_PROPERTY = "scope";
private static final String ATTRIBUTE = "Count";

CurrentMetrics(long interval, MBeanServerConnection remote, String keySpace) {
super(interval, remote, keySpace);
}

@Override public void printMetrics() throws Exception {
ObjectName readObjectName = new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=" + keySpace + ",scope=*,name=ReadLatency");
ObjectName writeObjectName = new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=" + keySpace + ",scope=*,name=WriteLatency");

List<MonitoringEntry> readItems = getMonitoringEntryList(remote, readObjectName);
List<MonitoringEntry> writeItems = getMonitoringEntryList(remote, writeObjectName);

while (!shutdown) {
Thread.sleep(TimeUnit.SECONDS.toMillis(interval));
super.printMetrics(createResultItems(readItems), createResultItems(writeItems));
}
}

private NavigableSet<ResultItem> createResultItems(List<MonitoringEntry> monitoringItems) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException {
NavigableSet<ResultItem> resultItems = new TreeSet<>();

for (MonitoringEntry item : monitoringItems) {
MonitoringEntry monitoringEntry = calculateDifference(remote, item);
if (monitoringEntry.count > 0) resultItems.add(new ResultItem(item.objectName, KEY_PROPERTY, monitoringEntry.count));
}

return resultItems;
}

private MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException {
Long count = (Long) remote.getAttribute(item.objectName, ATTRIBUTE);

long diff = count - item.count;

item.count = count;
return new MonitoringEntry(item.objectName, diff);
}

private List<MonitoringEntry> getMonitoringEntryList(MBeanServerConnection remote, ObjectName objectName) throws IOException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException {
List<MonitoringEntry> monitoringList = new LinkedList<>();

for (ObjectName mbean : remote.queryNames(objectName, null)) {
Long count = (Long) remote.getAttribute(mbean, ATTRIBUTE);
if (count > 0) monitoringList.add(new MonitoringEntry(mbean, count));
}

return monitoringList;
}

private static class MonitoringEntry {
private ObjectName objectName;
private long count;

MonitoringEntry(ObjectName objectName, long count) {
this.objectName = objectName;
this.count = count;
}
}

}
9 changes: 9 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/Metrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.pixonic.ctop.metrics;

public interface Metrics {

void printMetrics() throws Exception;

void shutdown();

}
15 changes: 15 additions & 0 deletions src/main/java/com/pixonic/ctop/metrics/MetricsFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.pixonic.ctop.metrics;

import javax.management.MBeanServerConnection;

public class MetricsFactory {

public static Metrics getMetrics(int version, long interval, MBeanServerConnection remote, String keySpace) {
if (version >= 3) {
return new CurrentMetrics(interval, remote, keySpace);
} else {
return new OldMetrics(interval, remote, keySpace);
}
}

}
Loading

0 comments on commit feb4b1b

Please sign in to comment.