Skip to content

Commit

Permalink
Refactor MBeanClient (opensource4you#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Jul 28, 2022
1 parent cb94360 commit 554d14d
Show file tree
Hide file tree
Showing 41 changed files with 325 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics.jmx;
package org.astraea.app.metrics;

import static java.util.Map.Entry;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics.jmx;
package org.astraea.app.metrics;

import java.util.HashMap;
import java.util.Hashtable;
Expand Down
2 changes: 0 additions & 2 deletions app/src/main/java/org/astraea/app/metrics/HasBeanObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.astraea.app.metrics;

import org.astraea.app.metrics.jmx.BeanObject;

public interface HasBeanObject {
BeanObject beanObject();

Expand Down
2 changes: 0 additions & 2 deletions app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.astraea.app.metrics.broker.BrokerTopicMetricsResult;
import org.astraea.app.metrics.broker.HasValue;
import org.astraea.app.metrics.broker.TotalTimeMs;
import org.astraea.app.metrics.jmx.BeanQuery;
import org.astraea.app.metrics.jmx.MBeanClient;
import org.astraea.app.metrics.platform.JvmMemory;
import org.astraea.app.metrics.platform.OperatingSystemInfo;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
Expand Down
282 changes: 282 additions & 0 deletions app/src/main/java/org/astraea/app/metrics/MBeanClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.stream.Collectors;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.MBeanFeatureInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.astraea.app.common.Utils;

/**
* A MBeanClient used to retrieve mbean value from remote Jmx server.
*
* <pre>{@code
* try(MBeanClient client = new MBeanClient(jmxConnectorServer.getAddress())) {
* BeanObject bean = client.queryBean(BeanQuery.builder("java.lang")
* .property("type", "MemoryManager")
* .property("name", "CodeCacheManager")
* .build());
* System.out.println(bean.getAttributes());
* }</pre>
*/
public interface MBeanClient extends AutoCloseable {

/**
* @param host the address of jmx server
* @param port the port of jmx server
* @return a mbean client using JNDI to lookup metrics.
*/
static MBeanClient jndi(String host, int port) {
try {
return of(
new JMXServiceURL(
String.format(
"service:jmx:rmi://%s:%s/jndi/rmi://%s:%s/jmxrmi", host, port, host, port)));
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}

static MBeanClient of(JMXServiceURL jmxServiceURL) {
try {
var jmxConnector = JMXConnectorFactory.connect(jmxServiceURL);
return new AbstractMBeanClient(jmxConnector.getMBeanServerConnection()) {
@Override
public String host() {
return jmxServiceURL.getHost();
}

@Override
public int port() {
return jmxServiceURL.getPort();
}

@Override
public void close() {
Utils.packException(jmxConnector::close);
}
};
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

static MBeanClient local() {
return new AbstractMBeanClient(ManagementFactory.getPlatformMBeanServer()) {
@Override
public String host() {
return Utils.hostname();
}

@Override
public int port() {
return -1;
}

@Override
public void close() {}
};
}

/**
* Fetch all attributes of target mbean.
*
* <p>Note that when exception is raised during the attribute fetching process, the exact
* exception will be placed into the attribute field.
*
* @param beanQuery the non-pattern BeanQuery
* @return A {@link BeanObject} contain all attributes if target resolved successfully.
*/
BeanObject queryBean(BeanQuery beanQuery);

/**
* Fetch given attributes of target mbean
*
* <p>Note that when exception is raised during the attribute fetching process, the exact
* exception will be placed into the attribute field.
*
* @param beanQuery the non-pattern BeanQuery
* @param attributeNameCollection a list of attribute you want to retrieve
* @return A {@link BeanObject} contain given specific attributes if target resolved successfully.
*/
BeanObject queryBean(BeanQuery beanQuery, Collection<String> attributeNameCollection);

/**
* Query mBeans by pattern.
*
* <p>Query mbeans by {@link ObjectName} pattern, the returned {@link BeanObject}s will contain
* all the available attributes
*
* <p>Note that when exception is raised during the attribute fetching process, the exact
* exception will be placed into the attribute field.
*
* @param beanQuery the pattern to query
* @return A {@link Set} of {@link BeanObject}, all BeanObject has its own attributes resolved.
*/
Collection<BeanObject> queryBeans(BeanQuery beanQuery);

/**
* Returns the list of domains in which any MBean is currently registered.
*
* <p>The order of strings within the returned array is not defined.
*
* @return a {@link List} of domain name {@link String}
*/
List<String> listDomains();

/** @return the host address of jmx server */
String host();

/** @return the port listened by jmx server */
int port();

@Override
void close();

abstract class AbstractMBeanClient implements MBeanClient {

private final MBeanServerConnection connection;

AbstractMBeanClient(MBeanServerConnection connection) {
this.connection = connection;
}

@Override
public BeanObject queryBean(BeanQuery beanQuery) {
try {
// ask for MBeanInfo
var mBeanInfo = connection.getMBeanInfo(beanQuery.objectName());

// create a list builder all available attributes name
var attributeName =
Arrays.stream(mBeanInfo.getAttributes())
.map(MBeanFeatureInfo::getName)
.collect(Collectors.toList());

// query the result
return queryBean(beanQuery, attributeName);
} catch (ReflectionException | IntrospectionException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InstanceNotFoundException e) {
throw new NoSuchElementException(e.getMessage());
}
}

@Override
public BeanObject queryBean(BeanQuery beanQuery, Collection<String> attributeNameCollection) {
try {

// fetch attribute value from mbean server
var attributeNameArray = attributeNameCollection.toArray(new String[0]);
var attributeList =
connection.getAttributes(beanQuery.objectName(), attributeNameArray).asList();

// collect attribute name & value into a map
var attributes = new HashMap<String, Object>();
attributeList.forEach(
attribute -> attributes.put(attribute.getName(), attribute.getValue()));

// according to the javadoc of MBeanServerConnection#getAttributes, the API will
// ignore any
// error occurring during the fetch process (for example, attribute not exists). Below code
// check for such condition and try to figure out what exactly the error is. put it into
// attributes return result.
var notResolvedAttributes =
Arrays.stream(attributeNameArray)
.filter(str -> !attributes.containsKey(str))
.collect(Collectors.toSet());
notResolvedAttributes.forEach(
attributeName ->
attributes.put(
attributeName, fetchAttributeObjectOrException(beanQuery, attributeName)));

// collect result, and build a new BeanObject as return result
return new BeanObject(beanQuery.domainName(), beanQuery.properties(), attributes);

} catch (ReflectionException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InstanceNotFoundException e) {
throw new NoSuchElementException(e.getMessage());
}
}

private Object fetchAttributeObjectOrException(BeanQuery beanQuery, String attributeName) {
// It is possible to trigger some unexpected runtime exception during the following call.
// For example, on my machine when I try to get attribute "BootClassPath" from
// "java.lang:type=Runtime".
// I will get a {@link java.lang.UnsupportedOperationException} indicates that "Boot class
// path
// mechanism is not supported". Those attribute actually exists, but I cannot retrieve those
// attribute value. Doing so I get that error.
//
// Instead of blinding that attribute from the library user, I decided to put the
// exception
// into their result.
try {
return connection.getAttribute(beanQuery.objectName(), attributeName);
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Exception e) {
return e;
}
}

@Override
public Collection<BeanObject> queryBeans(BeanQuery beanQuery) {
try {
return connection.queryMBeans(beanQuery.objectName(), null).stream()
.map(ObjectInstance::getObjectName)
.map(BeanQuery::fromObjectName)
.map(this::queryBean)
.collect(Collectors.toSet());

} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public List<String> listDomains() {
try {
return Arrays.asList(connection.getDomains());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}
3 changes: 0 additions & 3 deletions app/src/main/java/org/astraea/app/metrics/MetricExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
import javax.management.openmbean.CompositeDataSupport;
import javax.management.remote.JMXServiceURL;
import org.astraea.app.argument.Field;
import org.astraea.app.metrics.jmx.BeanObject;
import org.astraea.app.metrics.jmx.BeanQuery;
import org.astraea.app.metrics.jmx.MBeanClient;

public class MetricExplorer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.Map;
import java.util.Objects;
import org.astraea.app.metrics.jmx.BeanObject;
import org.astraea.app.metrics.BeanObject;

public class BrokerTopicMetricsResult implements HasCount, HasEventType, HasRate {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.astraea.app.metrics.broker;

import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.jmx.BeanObject;

public interface HasValue extends HasBeanObject {
default long value() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.BeanQuery;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.jmx.BeanObject;
import org.astraea.app.metrics.jmx.BeanQuery;
import org.astraea.app.metrics.jmx.MBeanClient;
import org.astraea.app.metrics.MBeanClient;

public final class LogMetrics {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import org.astraea.app.metrics.jmx.BeanObject;
import org.astraea.app.metrics.jmx.BeanQuery;
import org.astraea.app.metrics.jmx.MBeanClient;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.BeanQuery;
import org.astraea.app.metrics.MBeanClient;

public final class ServerMetrics {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.astraea.app.metrics.broker;

import java.util.Map;
import org.astraea.app.metrics.jmx.BeanObject;
import org.astraea.app.metrics.BeanObject;

public class TotalTimeMs implements HasPercentiles, HasCount, HasStatistics {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.jmx.MBeanClient;
import org.astraea.app.metrics.MBeanClient;

public class BeanCollector {

Expand Down
Loading

0 comments on commit 554d14d

Please sign in to comment.