From feb4b1be7945c69f336d19b81a41d04a7336ea51 Mon Sep 17 00:00:00 2001 From: Anton Gabov Date: Fri, 11 May 2018 14:32:53 +0300 Subject: [PATCH] Implement support for Cassandra 3.x version (#1) --- src/main/java/com/pixonic/ctop/Main.java | 133 ++---------------- .../pixonic/ctop/metrics/AbstractMetrics.java | 80 +++++++++++ .../pixonic/ctop/metrics/CurrentMetrics.java | 74 ++++++++++ .../com/pixonic/ctop/metrics/Metrics.java | 9 ++ .../pixonic/ctop/metrics/MetricsFactory.java | 15 ++ .../com/pixonic/ctop/metrics/OldMetrics.java | 72 ++++++++++ .../com/pixonic/ctop/metrics/ResultItem.java | 24 ++++ 7 files changed, 286 insertions(+), 121 deletions(-) create mode 100644 src/main/java/com/pixonic/ctop/metrics/AbstractMetrics.java create mode 100644 src/main/java/com/pixonic/ctop/metrics/CurrentMetrics.java create mode 100644 src/main/java/com/pixonic/ctop/metrics/Metrics.java create mode 100644 src/main/java/com/pixonic/ctop/metrics/MetricsFactory.java create mode 100644 src/main/java/com/pixonic/ctop/metrics/OldMetrics.java create mode 100644 src/main/java/com/pixonic/ctop/metrics/ResultItem.java diff --git a/src/main/java/com/pixonic/ctop/Main.java b/src/main/java/com/pixonic/ctop/Main.java index 929295a..ec09bca 100644 --- a/src/main/java/com/pixonic/ctop/Main.java +++ b/src/main/java/com/pixonic/ctop/Main.java @@ -1,15 +1,15 @@ package com.pixonic.ctop; -import javax.management.*; +import com.pixonic.ctop.metrics.Metrics; +import com.pixonic.ctop.metrics.MetricsFactory; + +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.TimeUnit; public class Main { - private static volatile boolean shutdown = false; public static void main(String[] args) throws Exception { if (args.length == 0) { @@ -24,131 +24,22 @@ public static void main(String[] args) throws Exception { interval = Integer.parseInt(args[2]); } - Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown = true)); - System.out.println("Connecting to " + hostAndPort + "..."); final JMXServiceURL target = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + hostAndPort + "/jmxrmi"); - final JMXConnector connector = JMXConnectorFactory.connect(target); - final MBeanServerConnection remote = connector.getMBeanServerConnection(); - ObjectName objectName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,keyspace=" + keySpace + ",columnfamily=*"); - - List items = new LinkedList<>(); - for (ObjectName mbean : remote.queryNames(objectName, null)) { - AttributeList counts = remote.getAttributes(mbean, new String[] {"ReadCount", "WriteCount"}); - long readCount = (Long) (((Attribute) counts.get(0)).getValue()); - long writeCount = (Long) (((Attribute) counts.get(1)).getValue()); - if (readCount > 0 || writeCount > 0) items.add(new MonitoringEntry(mbean, readCount, writeCount)); - } + ObjectName storageMBean = new ObjectName("org.apache.cassandra.db:type=StorageService"); + String releaseVersion = (String) remote.getAttribute(storageMBean, "ReleaseVersion"); + int majorVersion = Integer.valueOf(releaseVersion.substring(0, releaseVersion.indexOf('.'))); + System.out.println("Cassandra version is " + releaseVersion); + Metrics metrics = MetricsFactory.getMetrics(majorVersion, interval, remote, keySpace); System.out.println("Connected. Gathering data..."); + metrics.printMetrics(); - while (!shutdown) { - Thread.sleep(TimeUnit.SECONDS.toMillis(interval)); - Set readResult = new TreeSet<>(); - Set writeResult = new TreeSet<>(); - for (MonitoringEntry item : items) { - MonitoringEntry resultItem = calculateDifference(remote, item); - if (resultItem.readCount > 0) readResult.add(new ResultItem(item.cf, resultItem.readCount)); - if (resultItem.writeCount > 0) writeResult.add(new ResultItem(item.cf, resultItem.writeCount)); - } - - //clear console - System.out.print("\033[H\033[2J"); - System.out.flush(); - - System.out.println("Cassandra top v0.1"); - System.out.println(); - System.out.println(new Date() + " / " + interval + "s"); - System.out.println(); - - int width = SttySupport.getTerminalWidth(); - int height = SttySupport.getTerminalHeight(); - - int posWrite = width / 2; - String leftStr = "Reads", rightStr = "Writes"; - - System.out.println(makeLine(leftStr, rightStr, posWrite)); - System.out.println(); - Iterator readIt = readResult.iterator(); - Iterator writeIt = writeResult.iterator(); - Long maxReadCount = null, maxWriteCount = null; - for(int i = 7; i < height; i++) { - if (readIt.hasNext()) { - ResultItem resultItem = readIt.next(); - if (maxReadCount == null) maxReadCount = resultItem.count; - leftStr = formatCounter(resultItem, maxReadCount); - } else { - leftStr = ""; - } - if (writeIt.hasNext()) { - ResultItem resultItem = writeIt.next(); - if (maxWriteCount == null) maxWriteCount = resultItem.count; - rightStr = formatCounter(resultItem, maxWriteCount); - } else { - rightStr = ""; - } - if (leftStr.length() == 0 && rightStr.length() == 0) break; - System.out.println(makeLine(leftStr, rightStr, posWrite)); - } - } - } - - private static String formatCounter(ResultItem resultItem, long maxCount) { - int maxLen = String.valueOf(maxCount).length(); - return StringUtils.leftPad(String.valueOf(resultItem.count), maxLen + 1) + " " + resultItem; - } - - private static String makeLine(String left, String right, int rightPos) { - if (left.length() > rightPos) left = left.substring(0, rightPos); - if (right.length() > rightPos) left = right.substring(0, rightPos); - return StringUtils.rightPad(left, rightPos) + " " + right; + Runtime.getRuntime().addShutdownHook(new Thread(metrics::shutdown)); } - private static MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException { - AttributeList counts = remote.getAttributes(item.cf, new String[] {"ReadCount", "WriteCount"}); - long readCount = (Long) (((Attribute) counts.get(0)).getValue()); - long writeCount = (Long) (((Attribute) counts.get(1)).getValue()); - - long readDf = readCount - item.readCount; - long writeDf = writeCount - item.writeCount; - - item.readCount = readCount; - item.writeCount = writeCount; - return new MonitoringEntry(item.cf, readDf, writeDf); - } - - private static class MonitoringEntry { - private final ObjectName cf; - private long readCount; - private long writeCount; - - public MonitoringEntry(ObjectName cf, long readCount, long writeCount) { - this.cf = cf; - this.readCount = readCount; - this.writeCount = writeCount; - } - } - - private static class ResultItem implements Comparable { - private final ObjectName cf; - private final long count; - - public ResultItem(ObjectName cf, long count) { - this.cf = cf; - this.count = count; - } - - @Override public int compareTo(ResultItem o) { - long d = count - o.count; - return -(d < 0 ? -1 : (d > 0 ? 1 : 0)); - } - - @Override public String toString() { - return cf.getKeyProperty("columnfamily"); - } - } } diff --git a/src/main/java/com/pixonic/ctop/metrics/AbstractMetrics.java b/src/main/java/com/pixonic/ctop/metrics/AbstractMetrics.java new file mode 100644 index 0000000..7725f4a --- /dev/null +++ b/src/main/java/com/pixonic/ctop/metrics/AbstractMetrics.java @@ -0,0 +1,80 @@ +package com.pixonic.ctop.metrics; + +import com.pixonic.ctop.StringUtils; +import com.pixonic.ctop.SttySupport; + +import javax.management.MBeanServerConnection; +import java.util.Date; +import java.util.Iterator; +import java.util.NavigableSet; + +public abstract class AbstractMetrics implements Metrics { + + protected volatile boolean shutdown = false; + protected final long interval; + protected final MBeanServerConnection remote; + protected final String keySpace; + + AbstractMetrics(long interval, MBeanServerConnection remote, String keySpace) { + this.interval = interval; + this.remote = remote; + this.keySpace = keySpace; + } + + void printMetrics(NavigableSet readResult, NavigableSet writeResult) { + //clear console + System.out.print("\033[H\033[2J"); + System.out.flush(); + + System.out.println("Cassandra top v0.2"); + System.out.println(); + System.out.println(new Date() + " / " + interval + "s"); + System.out.println(); + + int width = SttySupport.getTerminalWidth(); + int height = SttySupport.getTerminalHeight(); + + int posWrite = width / 2; + String leftStr = "Reads", rightStr = "Writes"; + + System.out.println(makeLine(leftStr, rightStr, posWrite)); + System.out.println(); + Iterator readIt = readResult.iterator(); + Iterator writeIt = writeResult.iterator(); + Long maxReadCount = null, maxWriteCount = null; + for(int i = 7; i < height; i++) { + if (readIt.hasNext()) { + ResultItem resultItem = readIt.next(); + if (maxReadCount == null) maxReadCount = resultItem.count; + leftStr = formatCounter(resultItem, maxReadCount); + } else { + leftStr = ""; + } + if (writeIt.hasNext()) { + ResultItem resultItem = writeIt.next(); + if (maxWriteCount == null) maxWriteCount = resultItem.count; + rightStr = formatCounter(resultItem, maxWriteCount); + } else { + rightStr = ""; + } + if (leftStr.length() == 0 && rightStr.length() == 0) break; + System.out.println(makeLine(leftStr, rightStr, posWrite)); + } + } + + @Override public void shutdown() { + shutdown = true; + } + + protected String formatCounter(ResultItem resultItem, long maxCount) { + int maxLen = String.valueOf(maxCount).length(); + return StringUtils.leftPad(String.valueOf(resultItem.count), maxLen + 1) + " " + resultItem; + } + + protected String makeLine(String left, String right, int rightPos) { + if (left.length() > rightPos) left = left.substring(0, rightPos); + if (right.length() > rightPos) left = right.substring(0, rightPos); + return StringUtils.rightPad(left, rightPos) + " " + right; + } + +} diff --git a/src/main/java/com/pixonic/ctop/metrics/CurrentMetrics.java b/src/main/java/com/pixonic/ctop/metrics/CurrentMetrics.java new file mode 100644 index 0000000..1977921 --- /dev/null +++ b/src/main/java/com/pixonic/ctop/metrics/CurrentMetrics.java @@ -0,0 +1,74 @@ +package com.pixonic.ctop.metrics; + +import javax.management.*; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +public class CurrentMetrics extends AbstractMetrics { + + private static final String KEY_PROPERTY = "scope"; + private static final String ATTRIBUTE = "Count"; + + CurrentMetrics(long interval, MBeanServerConnection remote, String keySpace) { + super(interval, remote, keySpace); + } + + @Override public void printMetrics() throws Exception { + ObjectName readObjectName = new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=" + keySpace + ",scope=*,name=ReadLatency"); + ObjectName writeObjectName = new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=" + keySpace + ",scope=*,name=WriteLatency"); + + List readItems = getMonitoringEntryList(remote, readObjectName); + List writeItems = getMonitoringEntryList(remote, writeObjectName); + + while (!shutdown) { + Thread.sleep(TimeUnit.SECONDS.toMillis(interval)); + super.printMetrics(createResultItems(readItems), createResultItems(writeItems)); + } + } + + private NavigableSet createResultItems(List monitoringItems) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException { + NavigableSet resultItems = new TreeSet<>(); + + for (MonitoringEntry item : monitoringItems) { + MonitoringEntry monitoringEntry = calculateDifference(remote, item); + if (monitoringEntry.count > 0) resultItems.add(new ResultItem(item.objectName, KEY_PROPERTY, monitoringEntry.count)); + } + + return resultItems; + } + + private MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, IOException { + Long count = (Long) remote.getAttribute(item.objectName, ATTRIBUTE); + + long diff = count - item.count; + + item.count = count; + return new MonitoringEntry(item.objectName, diff); + } + + private List getMonitoringEntryList(MBeanServerConnection remote, ObjectName objectName) throws IOException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException { + List monitoringList = new LinkedList<>(); + + for (ObjectName mbean : remote.queryNames(objectName, null)) { + Long count = (Long) remote.getAttribute(mbean, ATTRIBUTE); + if (count > 0) monitoringList.add(new MonitoringEntry(mbean, count)); + } + + return monitoringList; + } + + private static class MonitoringEntry { + private ObjectName objectName; + private long count; + + MonitoringEntry(ObjectName objectName, long count) { + this.objectName = objectName; + this.count = count; + } + } + +} diff --git a/src/main/java/com/pixonic/ctop/metrics/Metrics.java b/src/main/java/com/pixonic/ctop/metrics/Metrics.java new file mode 100644 index 0000000..4bc04d3 --- /dev/null +++ b/src/main/java/com/pixonic/ctop/metrics/Metrics.java @@ -0,0 +1,9 @@ +package com.pixonic.ctop.metrics; + +public interface Metrics { + + void printMetrics() throws Exception; + + void shutdown(); + +} diff --git a/src/main/java/com/pixonic/ctop/metrics/MetricsFactory.java b/src/main/java/com/pixonic/ctop/metrics/MetricsFactory.java new file mode 100644 index 0000000..8c1003a --- /dev/null +++ b/src/main/java/com/pixonic/ctop/metrics/MetricsFactory.java @@ -0,0 +1,15 @@ +package com.pixonic.ctop.metrics; + +import javax.management.MBeanServerConnection; + +public class MetricsFactory { + + public static Metrics getMetrics(int version, long interval, MBeanServerConnection remote, String keySpace) { + if (version >= 3) { + return new CurrentMetrics(interval, remote, keySpace); + } else { + return new OldMetrics(interval, remote, keySpace); + } + } + +} diff --git a/src/main/java/com/pixonic/ctop/metrics/OldMetrics.java b/src/main/java/com/pixonic/ctop/metrics/OldMetrics.java new file mode 100644 index 0000000..8775ea9 --- /dev/null +++ b/src/main/java/com/pixonic/ctop/metrics/OldMetrics.java @@ -0,0 +1,72 @@ +package com.pixonic.ctop.metrics; + +import javax.management.*; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +public class OldMetrics extends AbstractMetrics { + + private static final String KEY_PROPERTY = "columnfamily"; + private static final String[] ATTRIBUTES = new String[] {"ReadCount", "WriteCount"}; + + OldMetrics(long interval, MBeanServerConnection remote, String keySpace) { + super(interval, remote, keySpace); + } + + @Override public void printMetrics() throws Exception { + ObjectName objectName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,keyspace=" + keySpace + ",columnfamily=*"); + + List items = new LinkedList<>(); + for (ObjectName mbean : remote.queryNames(objectName, null)) { + AttributeList counts = remote.getAttributes(mbean, ATTRIBUTES); + long readCount = (Long) (((Attribute) counts.get(0)).getValue()); + long writeCount = (Long) (((Attribute) counts.get(1)).getValue()); + if (readCount > 0 || writeCount > 0) items.add(new MonitoringEntry(mbean, readCount, writeCount)); + } + + while (!shutdown) { + Thread.sleep(TimeUnit.SECONDS.toMillis(interval)); + NavigableSet readResult = new TreeSet<>(); + NavigableSet writeResult = new TreeSet<>(); + for (MonitoringEntry item : items) { + MonitoringEntry resultItem = calculateDifference(remote, item); + if (resultItem.readCount > 0) + readResult.add(new ResultItem(item.cf, KEY_PROPERTY, resultItem.readCount)); + if (resultItem.writeCount > 0) + writeResult.add(new ResultItem(item.cf, KEY_PROPERTY, resultItem.writeCount)); + } + + super.printMetrics(readResult, writeResult); + } + } + + private MonitoringEntry calculateDifference(MBeanServerConnection remote, MonitoringEntry item) throws InstanceNotFoundException, IOException, ReflectionException { + AttributeList counts = remote.getAttributes(item.cf, ATTRIBUTES); + long readCount = (Long) (((Attribute) counts.get(0)).getValue()); + long writeCount = (Long) (((Attribute) counts.get(1)).getValue()); + + long readDf = readCount - item.readCount; + long writeDf = writeCount - item.writeCount; + + item.readCount = readCount; + item.writeCount = writeCount; + return new MonitoringEntry(item.cf, readDf, writeDf); + } + + private static class MonitoringEntry { + private final ObjectName cf; + private long readCount; + private long writeCount; + + MonitoringEntry(ObjectName cf, long readCount, long writeCount) { + this.cf = cf; + this.readCount = readCount; + this.writeCount = writeCount; + } + } + +} diff --git a/src/main/java/com/pixonic/ctop/metrics/ResultItem.java b/src/main/java/com/pixonic/ctop/metrics/ResultItem.java new file mode 100644 index 0000000..bebb664 --- /dev/null +++ b/src/main/java/com/pixonic/ctop/metrics/ResultItem.java @@ -0,0 +1,24 @@ +package com.pixonic.ctop.metrics; + +import javax.management.ObjectName; + +public class ResultItem implements Comparable { + private final ObjectName cf; + private final String keyProperty; + final long count; + + ResultItem(ObjectName cf, String keyProperty, long count) { + this.cf = cf; + this.keyProperty = keyProperty; + this.count = count; + } + + @Override public int compareTo(ResultItem o) { + long d = count - o.count; + return -(d < 0 ? -1 : (d > 0 ? 1 : 0)); + } + + @Override public String toString() { + return cf.getKeyProperty(keyProperty); + } +}