Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ALLUXIO-2182] Added a metrics master for aggregating the client metrics #7251

Merged
merged 29 commits into from
May 18, 2018
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class AlluxioBlockStore {
* @return the {@link AlluxioBlockStore} created
*/
public static AlluxioBlockStore create() {
return create(FileSystemContext.INSTANCE);
return create(FileSystemContext.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.metrics.ClientMetrics;
import alluxio.metrics.MetricsSystem;
import alluxio.network.netty.NettyRPC;
import alluxio.network.netty.NettyRPCContext;
import alluxio.network.protocol.databuffer.DataBuffer;
Expand Down Expand Up @@ -69,6 +71,7 @@ public DataBuffer readPacket() throws IOException {
ByteBuffer buffer = mReader.read(mPos, Math.min(mPacketSize, mEnd - mPos));
DataBuffer dataBuffer = new DataByteBuffer(buffer, buffer.remaining());
mPos += dataBuffer.getLength();
MetricsSystem.clientCounter(ClientMetrics.BYTES_READ_LOCAL).inc(dataBuffer.getLength());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of incrementing the metrics here, we could have a wrapper around PacketReaders that handles metrics. We would need to expose the metric name as part of the PacketReader API though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's different: the metric here collects the short-circuit read, but not the read from remote.
Also, I don't like the API to be modified for metrics collection.

return dataBuffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Factory {
private Factory() {} // prevent instantiation

public static FileSystem get() {
return get(FileSystemContext.INSTANCE);
return get(FileSystemContext.get());
}

public static FileSystem get(FileSystemContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
import alluxio.PropertyKey;
import alluxio.client.block.BlockMasterClient;
import alluxio.client.block.BlockMasterClientPool;
import alluxio.client.metrics.ClientMasterSync;
import alluxio.client.metrics.MetricsMasterClient;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.UnavailableException;
import alluxio.heartbeat.HeartbeatContext;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.master.MasterClientConfig;
import alluxio.master.MasterInquireClient;
import alluxio.metrics.MetricsSystem;
import alluxio.network.netty.NettyChannelPool;
import alluxio.network.netty.NettyClient;
import alluxio.resource.CloseableResource;
import alluxio.util.CommonUtils;
import alluxio.util.IdUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.ThreadUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
Expand All @@ -40,6 +48,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.GuardedBy;
Expand All @@ -61,7 +71,7 @@
public final class FileSystemContext implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemContext.class);

public static final FileSystemContext INSTANCE = create();
private static FileSystemContext sInstance;

static {
MetricsSystem.startSinks();
Expand All @@ -75,6 +85,12 @@ public final class FileSystemContext implements Closeable {
// Closed flag for debugging information.
private final AtomicBoolean mClosed;

private ExecutorService mExecutorService;
private MetricsMasterClient mMetricsMasterClient;
private ClientMasterSync mClientMasterSync;

private final long mId;

// The netty data server channel pools.
private final ConcurrentHashMap<SocketAddress, NettyChannelPool>
mNettyChannelPools = new ConcurrentHashMap<>();
Expand All @@ -98,10 +114,24 @@ public final class FileSystemContext implements Closeable {
/** The parent user associated with the {@link FileSystemContext}. */
private final Subject mParentSubject;

/**
* @return the instance of file system context
*/
public static FileSystemContext get() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are changing this to get, should we do the same for the other contexts which use INSTANCE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSTANCE also works for most cases. This method is a bit special as we want to defer its instantation for better unit testing.

if (sInstance == null) {
synchronized (FileSystemContext.class) {
if (sInstance == null) {
sInstance = create();
}
}
}
return sInstance;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not safe, sInstance can be set multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}

/**
* @return the context
*/
public static FileSystemContext create() {
private static FileSystemContext create() {
return create(null);
}

Expand Down Expand Up @@ -133,6 +163,13 @@ public static FileSystemContext create(Subject subject, MasterInquireClient mast
*/
private FileSystemContext(Subject subject) {
mParentSubject = subject;
mExecutorService = Executors.newFixedThreadPool(1,
ThreadFactoryUtils.build("metrics-master-heartbeat-%d", true));
mId = IdUtils.createFileSystemContextId();
LOG.info("Created filesystem context with id {}."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we guard this with USER_METRICS_COLLECTION_ENABLED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to. This ID can be used for other purposes, too.

+ " This ID will be used for identifying the information "
+ "aggregated by the master, such as metrics",
mId);
mClosed = new AtomicBoolean(false);
}

Expand All @@ -147,6 +184,20 @@ private synchronized void init(MasterInquireClient masterInquireClient) {
new FileSystemMasterClientPool(mParentSubject, mMasterInquireClient);
mBlockMasterClientPool = new BlockMasterClientPool(mParentSubject, mMasterInquireClient);
mClosed.set(false);

if (Configuration.getBoolean(PropertyKey.USER_METRICS_COLLECTION_ENABLED)) {
// setup metrics master client sync
mMetricsMasterClient = new MetricsMasterClient(MasterClientConfig.defaults()
.withSubject(mParentSubject).withMasterInquireClient(mMasterInquireClient));
mClientMasterSync = new ClientMasterSync(mMetricsMasterClient, this);
mExecutorService = Executors.newFixedThreadPool(1,
ThreadFactoryUtils.build("metrics-master-heartbeat-%d", true));
mExecutorService
.submit(new HeartbeatThread(HeartbeatContext.MASTER_METRICS_SYNC, mClientMasterSync,
(int) Configuration.getMs(PropertyKey.USER_METRICS_HEARTBEAT_INTERVAL_MS)));
// register the shutdown hook
Runtime.getRuntime().addShutdownHook(new MetricsMasterSyncShutDownHook());
}
}

/**
Expand All @@ -169,6 +220,12 @@ public void close() throws IOException {
mNettyChannelPools.clear();

synchronized (this) {
if (Configuration.getBoolean(PropertyKey.USER_METRICS_COLLECTION_ENABLED)) {
ThreadUtils.shutdownAndAwaitTermination(mExecutorService);
mMetricsMasterClient.close();
mMetricsMasterClient = null;
mClientMasterSync = null;
}
mLocalWorkerInitialized = false;
mLocalWorker = null;
mClosed.set(true);
Expand All @@ -184,6 +241,13 @@ public synchronized void reset() throws IOException {
init(MasterInquireClient.Factory.create());
}

/**
* @return the unique id of the context
*/
public long getId() {
return mId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to determine the ID of a process, how will users be able to map an ID to a process after seeing the metrics be abnormal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but there does not seem a good approach commonly agreed on: https://stackoverflow.com/questions/35842/how-can-a-java-program-get-its-own-process-id?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa

We could log the ID after it's created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging sounds good for now.

}

/**
* @return the parent subject
*/
Expand Down Expand Up @@ -349,6 +413,20 @@ private List<WorkerNetAddress> getWorkerAddresses() throws IOException {
return localWorkerNetAddresses.isEmpty() ? workerNetAddresses : localWorkerNetAddresses;
}

/**
* Class that heartbeats to the metrics master before exit.
*/
private final class MetricsMasterSyncShutDownHook extends Thread {
@Override
public void run() {
try {
mClientMasterSync.heartbeat();
} catch (InterruptedException e) {
LOG.error("Failed to heartbeat to the metrics master before exit");
}
}
}

/**
* Class that contains metrics about FileSystemContext.
*/
Expand All @@ -360,7 +438,7 @@ private static void initializeGauges() {
@Override
public Long getValue() {
long ret = 0;
for (NettyChannelPool pool : INSTANCE.mNettyChannelPools.values()) {
for (NettyChannelPool pool : get().mNettyChannelPools.values()) {
ret += pool.size();
}
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static boolean waitCompleted(final FileSystem fs, final AlluxioURI uri,
* @param uri the uri of the file to persist
*/
public static void persistFile(final FileSystem fs, final AlluxioURI uri) throws IOException {
FileSystemContext context = FileSystemContext.INSTANCE;
FileSystemContext context = FileSystemContext.get();
FileSystemMasterClient client = context.acquireMasterClient();
try {
client.scheduleAsyncPersist(uri);
Expand Down Expand Up @@ -164,7 +164,7 @@ public Boolean apply(Void input) {
*/
public static List<AlluxioURI> checkConsistency(AlluxioURI path, CheckConsistencyOptions options)
throws IOException {
FileSystemContext context = FileSystemContext.INSTANCE;
FileSystemContext context = FileSystemContext.get();
FileSystemMasterClient client = context.acquireMasterClient();
try {
return client.checkConsistency(path, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class DummyFileOutputStream extends FileOutStream {
* @param options the set of options specific to this operation
*/
public DummyFileOutputStream(AlluxioURI path, OutStreamOptions options) throws IOException {
super(path, options, FileSystemContext.INSTANCE);
super(path, options, FileSystemContext.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.metrics;

import alluxio.client.file.FileSystemContext;
import alluxio.heartbeat.HeartbeatExecutor;
import alluxio.metrics.Metric;
import alluxio.metrics.MetricsSystem;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.concurrent.ThreadSafe;

/**
* Task that carries the client metrics information to master through heartheat. This class manages
* its own {@link MetricsMasterClient}.
*
* If the task fails to heartbeat to the master, it will destroy its old master client and recreate
* it before retrying.
*/
@ThreadSafe
public final class ClientMasterSync implements HeartbeatExecutor {
private static final Logger LOG = LoggerFactory.getLogger(ClientMasterSync.class);

/** Client for communicating to metrics master. */
private final MetricsMasterClient mMasterClient;
private final FileSystemContext mContext;

/**
* Constructs a new {@link ClientMasterSync}.
*
* @param masterClient the master client
* @param context the filesystem context
*/
public ClientMasterSync(MetricsMasterClient masterClient, FileSystemContext context) {
mMasterClient = Preconditions.checkNotNull(masterClient, "masterClient");
mContext = Preconditions.checkNotNull(context, "context");
}

@Override
public void heartbeat() throws InterruptedException {
List<alluxio.thrift.Metric> metrics = new ArrayList<>();
for (Metric metric : MetricsSystem.allClientMetrics()) {
metric.setInstanceId(Long.toString(mContext.getId()));
metrics.add(metric.toThrift());
}
try {
mMasterClient.heartbeat(metrics);
} catch (IOException e) {
// An error occurred, log and ignore it or error if heartbeat timeout is reached
LOG.error("Failed to heartbeat to the metrics master: {}", e);
mMasterClient.disconnect();
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.metrics;

import alluxio.AbstractMasterClient;
import alluxio.Constants;
import alluxio.client.file.FileSystemContext;
import alluxio.master.MasterClientConfig;
import alluxio.thrift.AlluxioService.Client;
import alluxio.thrift.Metric;
import alluxio.thrift.MetricsHeartbeatTOptions;
import alluxio.thrift.MetricsMasterClientService;
import alluxio.util.network.NetworkAddressUtils;

import org.apache.thrift.TException;

import java.io.IOException;
import java.util.List;

import javax.annotation.concurrent.ThreadSafe;

/**
* A client to use for interacting with a metrics master.
*/
@ThreadSafe
public class MetricsMasterClient extends AbstractMasterClient {
private MetricsMasterClientService.Client mClient = null;

/**
* Creates a new metrics master client.
*
* @param conf master client configuration
*/
public MetricsMasterClient(MasterClientConfig conf) {
super(conf);
}

@Override
protected Client getClient() {
return mClient;
}

@Override
protected String getServiceName() {
return Constants.METRICS_MASTER_CLIENT_SERVICE_NAME;
}

@Override
protected long getServiceVersion() {
return Constants.METRICS_MASTER_CLIENT_SERVICE_VERSION;
}

@Override
protected void afterConnect() {
mClient = new MetricsMasterClientService.Client(mProtocol);
}

/**
* The method the worker should periodically execute to heartbeat back to the master.
*
* @param metrics a list of client metrics
*/
public synchronized void heartbeat(final List<Metric> metrics) throws IOException {
retryRPC(new RpcCallable<Void>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested retries (retryRPC and heartbeatThread) may cause unneeded complexity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to be consistent with other heartbeat invocations. For example, in BlockMasterClient, we also retry the heartbeat RPC. But I'm also okay not to have retry since the heartbeat would retry anyway in the next beat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with addressing this later in a general refactor, could you open a ticket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public Void call() throws TException {
mClient.metricsHeartbeat(Long.toString(FileSystemContext.get().getId()),
NetworkAddressUtils.getClientHostName(), new MetricsHeartbeatTOptions(metrics));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client id is present in each metric as well as the metricsHeartbeat call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? The client id is carried in the metric sent over to the metrics master as an identifier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client id is in the RPC and in each of the metrics objects, is it necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It's needed so when no metric is sent, the master still knows which client their heartbeat is from.

return null;
}
});
}
}
Loading