diff --git a/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java b/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java index 08c4fc332925..9b9ce5517521 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java +++ b/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java @@ -69,7 +69,7 @@ public final class AlluxioBlockStore { * @return the {@link AlluxioBlockStore} created */ public static AlluxioBlockStore create() { - return create(FileSystemContext.INSTANCE); + return create(FileSystemContext.get()); } /** diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFilePacketReader.java b/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFilePacketReader.java index 321a3154a291..35c15b3eb63c 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFilePacketReader.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFilePacketReader.java @@ -15,6 +15,8 @@ import alluxio.PropertyKey; import alluxio.client.file.FileSystemContext; import alluxio.client.file.options.InStreamOptions; +import alluxio.metrics.ClientMetrics; +import alluxio.metrics.MetricsSystem; import alluxio.network.netty.NettyRPC; import alluxio.network.netty.NettyRPCContext; import alluxio.network.protocol.databuffer.DataBuffer; @@ -69,6 +71,7 @@ public DataBuffer readPacket() throws IOException { ByteBuffer buffer = mReader.read(mPos, Math.min(mPacketSize, mEnd - mPos)); DataBuffer dataBuffer = new DataByteBuffer(buffer, buffer.remaining()); mPos += dataBuffer.getLength(); + MetricsSystem.clientCounter(ClientMetrics.BYTES_READ_LOCAL).inc(dataBuffer.getLength()); return dataBuffer; } diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java b/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java index 375089291f0f..e3a5917a3798 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileSystem.java @@ -65,7 +65,7 @@ class Factory { private Factory() {} // prevent instantiation public static FileSystem get() { - return get(FileSystemContext.INSTANCE); + return get(FileSystemContext.get()); } public static FileSystem get(FileSystemContext context) { diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java b/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java index 85bf0484ffbf..8504be2aff2f 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java @@ -15,14 +15,22 @@ import alluxio.PropertyKey; import alluxio.client.block.BlockMasterClient; import alluxio.client.block.BlockMasterClientPool; +import alluxio.client.metrics.ClientMasterSync; +import alluxio.client.metrics.MetricsMasterClient; import alluxio.exception.ExceptionMessage; import alluxio.exception.status.UnavailableException; +import alluxio.heartbeat.HeartbeatContext; +import alluxio.heartbeat.HeartbeatThread; +import alluxio.master.MasterClientConfig; import alluxio.master.MasterInquireClient; import alluxio.metrics.MetricsSystem; import alluxio.network.netty.NettyChannelPool; import alluxio.network.netty.NettyClient; import alluxio.resource.CloseableResource; import alluxio.util.CommonUtils; +import alluxio.util.IdUtils; +import alluxio.util.ThreadFactoryUtils; +import alluxio.util.ThreadUtils; import alluxio.util.network.NetworkAddressUtils; import alluxio.wire.WorkerInfo; import alluxio.wire.WorkerNetAddress; @@ -40,6 +48,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; @@ -61,7 +71,7 @@ public final class FileSystemContext implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FileSystemContext.class); - public static final FileSystemContext INSTANCE = create(); + private static FileSystemContext sInstance; static { MetricsSystem.startSinks(); @@ -75,6 +85,12 @@ public final class FileSystemContext implements Closeable { // Closed flag for debugging information. private final AtomicBoolean mClosed; + private ExecutorService mExecutorService; + private MetricsMasterClient mMetricsMasterClient; + private ClientMasterSync mClientMasterSync; + + private final long mId; + // The netty data server channel pools. private final ConcurrentHashMap mNettyChannelPools = new ConcurrentHashMap<>(); @@ -98,10 +114,24 @@ public final class FileSystemContext implements Closeable { /** The parent user associated with the {@link FileSystemContext}. */ private final Subject mParentSubject; + /** + * @return the instance of file system context + */ + public static FileSystemContext get() { + if (sInstance == null) { + synchronized (FileSystemContext.class) { + if (sInstance == null) { + sInstance = create(); + } + } + } + return sInstance; + } + /** * @return the context */ - public static FileSystemContext create() { + private static FileSystemContext create() { return create(null); } @@ -133,6 +163,13 @@ public static FileSystemContext create(Subject subject, MasterInquireClient mast */ private FileSystemContext(Subject subject) { mParentSubject = subject; + mExecutorService = Executors.newFixedThreadPool(1, + ThreadFactoryUtils.build("metrics-master-heartbeat-%d", true)); + mId = IdUtils.createFileSystemContextId(); + LOG.info("Created filesystem context with id {}." + + " This ID will be used for identifying the information " + + "aggregated by the master, such as metrics", + mId); mClosed = new AtomicBoolean(false); } @@ -147,6 +184,20 @@ private synchronized void init(MasterInquireClient masterInquireClient) { new FileSystemMasterClientPool(mParentSubject, mMasterInquireClient); mBlockMasterClientPool = new BlockMasterClientPool(mParentSubject, mMasterInquireClient); mClosed.set(false); + + if (Configuration.getBoolean(PropertyKey.USER_METRICS_COLLECTION_ENABLED)) { + // setup metrics master client sync + mMetricsMasterClient = new MetricsMasterClient(MasterClientConfig.defaults() + .withSubject(mParentSubject).withMasterInquireClient(mMasterInquireClient)); + mClientMasterSync = new ClientMasterSync(mMetricsMasterClient, this); + mExecutorService = Executors.newFixedThreadPool(1, + ThreadFactoryUtils.build("metrics-master-heartbeat-%d", true)); + mExecutorService + .submit(new HeartbeatThread(HeartbeatContext.MASTER_METRICS_SYNC, mClientMasterSync, + (int) Configuration.getMs(PropertyKey.USER_METRICS_HEARTBEAT_INTERVAL_MS))); + // register the shutdown hook + Runtime.getRuntime().addShutdownHook(new MetricsMasterSyncShutDownHook()); + } } /** @@ -169,6 +220,12 @@ public void close() throws IOException { mNettyChannelPools.clear(); synchronized (this) { + if (Configuration.getBoolean(PropertyKey.USER_METRICS_COLLECTION_ENABLED)) { + ThreadUtils.shutdownAndAwaitTermination(mExecutorService); + mMetricsMasterClient.close(); + mMetricsMasterClient = null; + mClientMasterSync = null; + } mLocalWorkerInitialized = false; mLocalWorker = null; mClosed.set(true); @@ -184,6 +241,13 @@ public synchronized void reset() throws IOException { init(MasterInquireClient.Factory.create()); } + /** + * @return the unique id of the context + */ + public long getId() { + return mId; + } + /** * @return the parent subject */ @@ -349,6 +413,20 @@ private List getWorkerAddresses() throws IOException { return localWorkerNetAddresses.isEmpty() ? workerNetAddresses : localWorkerNetAddresses; } + /** + * Class that heartbeats to the metrics master before exit. + */ + private final class MetricsMasterSyncShutDownHook extends Thread { + @Override + public void run() { + try { + mClientMasterSync.heartbeat(); + } catch (InterruptedException e) { + LOG.error("Failed to heartbeat to the metrics master before exit"); + } + } + } + /** * Class that contains metrics about FileSystemContext. */ @@ -360,7 +438,7 @@ private static void initializeGauges() { @Override public Long getValue() { long ret = 0; - for (NettyChannelPool pool : INSTANCE.mNettyChannelPools.values()) { + for (NettyChannelPool pool : get().mNettyChannelPools.values()) { ret += pool.size(); } return ret; diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileSystemUtils.java b/core/client/fs/src/main/java/alluxio/client/file/FileSystemUtils.java index 6da189e0f3ed..c37fee40eb6b 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileSystemUtils.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileSystemUtils.java @@ -133,7 +133,7 @@ public static boolean waitCompleted(final FileSystem fs, final AlluxioURI uri, * @param uri the uri of the file to persist */ public static void persistFile(final FileSystem fs, final AlluxioURI uri) throws IOException { - FileSystemContext context = FileSystemContext.INSTANCE; + FileSystemContext context = FileSystemContext.get(); FileSystemMasterClient client = context.acquireMasterClient(); try { client.scheduleAsyncPersist(uri); @@ -164,7 +164,7 @@ public Boolean apply(Void input) { */ public static List checkConsistency(AlluxioURI path, CheckConsistencyOptions options) throws IOException { - FileSystemContext context = FileSystemContext.INSTANCE; + FileSystemContext context = FileSystemContext.get(); FileSystemMasterClient client = context.acquireMasterClient(); try { return client.checkConsistency(path, options); diff --git a/core/client/fs/src/main/java/alluxio/client/lineage/DummyFileOutputStream.java b/core/client/fs/src/main/java/alluxio/client/lineage/DummyFileOutputStream.java index 2da5e7598859..bd141f379920 100644 --- a/core/client/fs/src/main/java/alluxio/client/lineage/DummyFileOutputStream.java +++ b/core/client/fs/src/main/java/alluxio/client/lineage/DummyFileOutputStream.java @@ -34,7 +34,7 @@ public final class DummyFileOutputStream extends FileOutStream { * @param options the set of options specific to this operation */ public DummyFileOutputStream(AlluxioURI path, OutStreamOptions options) throws IOException { - super(path, options, FileSystemContext.INSTANCE); + super(path, options, FileSystemContext.get()); } @Override diff --git a/core/client/fs/src/main/java/alluxio/client/metrics/ClientMasterSync.java b/core/client/fs/src/main/java/alluxio/client/metrics/ClientMasterSync.java new file mode 100644 index 000000000000..d19b205739c3 --- /dev/null +++ b/core/client/fs/src/main/java/alluxio/client/metrics/ClientMasterSync.java @@ -0,0 +1,74 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.metrics; + +import alluxio.client.file.FileSystemContext; +import alluxio.heartbeat.HeartbeatExecutor; +import alluxio.metrics.Metric; +import alluxio.metrics.MetricsSystem; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Task that carries the client metrics information to master through heartheat. This class manages + * its own {@link MetricsMasterClient}. + * + * If the task fails to heartbeat to the master, it will destroy its old master client and recreate + * it before retrying. + */ +@ThreadSafe +public final class ClientMasterSync implements HeartbeatExecutor { + private static final Logger LOG = LoggerFactory.getLogger(ClientMasterSync.class); + + /** Client for communicating to metrics master. */ + private final MetricsMasterClient mMasterClient; + private final FileSystemContext mContext; + + /** + * Constructs a new {@link ClientMasterSync}. + * + * @param masterClient the master client + * @param context the filesystem context + */ + public ClientMasterSync(MetricsMasterClient masterClient, FileSystemContext context) { + mMasterClient = Preconditions.checkNotNull(masterClient, "masterClient"); + mContext = Preconditions.checkNotNull(context, "context"); + } + + @Override + public void heartbeat() throws InterruptedException { + List metrics = new ArrayList<>(); + for (Metric metric : MetricsSystem.allClientMetrics()) { + metric.setInstanceId(Long.toString(mContext.getId())); + metrics.add(metric.toThrift()); + } + try { + mMasterClient.heartbeat(metrics); + } catch (IOException e) { + // An error occurred, log and ignore it or error if heartbeat timeout is reached + LOG.error("Failed to heartbeat to the metrics master: {}", e); + mMasterClient.disconnect(); + } + } + + @Override + public void close() { + } +} diff --git a/core/client/fs/src/main/java/alluxio/client/metrics/MetricsMasterClient.java b/core/client/fs/src/main/java/alluxio/client/metrics/MetricsMasterClient.java new file mode 100644 index 000000000000..0a9f43e7864e --- /dev/null +++ b/core/client/fs/src/main/java/alluxio/client/metrics/MetricsMasterClient.java @@ -0,0 +1,82 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.metrics; + +import alluxio.AbstractMasterClient; +import alluxio.Constants; +import alluxio.client.file.FileSystemContext; +import alluxio.master.MasterClientConfig; +import alluxio.thrift.AlluxioService.Client; +import alluxio.thrift.Metric; +import alluxio.thrift.MetricsHeartbeatTOptions; +import alluxio.thrift.MetricsMasterClientService; +import alluxio.util.network.NetworkAddressUtils; + +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.List; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * A client to use for interacting with a metrics master. + */ +@ThreadSafe +public class MetricsMasterClient extends AbstractMasterClient { + private MetricsMasterClientService.Client mClient = null; + + /** + * Creates a new metrics master client. + * + * @param conf master client configuration + */ + public MetricsMasterClient(MasterClientConfig conf) { + super(conf); + } + + @Override + protected Client getClient() { + return mClient; + } + + @Override + protected String getServiceName() { + return Constants.METRICS_MASTER_CLIENT_SERVICE_NAME; + } + + @Override + protected long getServiceVersion() { + return Constants.METRICS_MASTER_CLIENT_SERVICE_VERSION; + } + + @Override + protected void afterConnect() { + mClient = new MetricsMasterClientService.Client(mProtocol); + } + + /** + * The method the worker should periodically execute to heartbeat back to the master. + * + * @param metrics a list of client metrics + */ + public synchronized void heartbeat(final List metrics) throws IOException { + retryRPC(new RpcCallable() { + @Override + public Void call() throws TException { + mClient.metricsHeartbeat(Long.toString(FileSystemContext.get().getId()), + NetworkAddressUtils.getClientHostName(), new MetricsHeartbeatTOptions(metrics)); + return null; + } + }); + } +} diff --git a/core/client/fs/src/test/java/alluxio/client/file/FileSystemContextTest.java b/core/client/fs/src/test/java/alluxio/client/file/FileSystemContextTest.java index 9665168a0e8a..02ebc482cdc7 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/FileSystemContextTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/FileSystemContextTest.java @@ -37,7 +37,7 @@ public void acquireAtMaxLimit() throws Exception { // Acquire all the clients for (int i = 0; i < Configuration.getInt(PropertyKey.USER_FILE_MASTER_CLIENT_THREADS); i++) { - clients.add(FileSystemContext.INSTANCE.acquireMasterClient()); + clients.add(FileSystemContext.get().acquireMasterClient()); } Thread acquireThread = new Thread(new AcquireClient()); acquireThread.start(); @@ -53,7 +53,7 @@ public void acquireAtMaxLimit() throws Exception { // Release all the clients for (FileSystemMasterClient client : clients) { - FileSystemContext.INSTANCE.releaseMasterClient(client); + FileSystemContext.get().releaseMasterClient(client); } // Wait for the spawned thread to complete. If it is unable to acquire a master client before @@ -69,8 +69,8 @@ public void acquireAtMaxLimit() throws Exception { class AcquireClient implements Runnable { @Override public void run() { - FileSystemMasterClient client = FileSystemContext.INSTANCE.acquireMasterClient(); - FileSystemContext.INSTANCE.releaseMasterClient(client); + FileSystemMasterClient client = FileSystemContext.get().acquireMasterClient(); + FileSystemContext.get().releaseMasterClient(client); } } } diff --git a/core/client/fs/src/test/java/alluxio/client/file/MockFileOutStream.java b/core/client/fs/src/test/java/alluxio/client/file/MockFileOutStream.java index a5834845f8f4..728f6d0ab361 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/MockFileOutStream.java +++ b/core/client/fs/src/test/java/alluxio/client/file/MockFileOutStream.java @@ -29,7 +29,7 @@ public final class MockFileOutStream extends FileOutStream { * inspection during tests. */ public MockFileOutStream() throws IOException { - super(new AlluxioURI("/"), OutStreamOptions.defaults(), FileSystemContext.INSTANCE); + super(new AlluxioURI("/"), OutStreamOptions.defaults(), FileSystemContext.get()); mStream = new ByteArrayOutputStream(); } diff --git a/core/client/fs/src/test/java/alluxio/client/util/ClientTestUtils.java b/core/client/fs/src/test/java/alluxio/client/util/ClientTestUtils.java index eafc494b309b..057b0f77a408 100644 --- a/core/client/fs/src/test/java/alluxio/client/util/ClientTestUtils.java +++ b/core/client/fs/src/test/java/alluxio/client/util/ClientTestUtils.java @@ -45,7 +45,7 @@ public static void resetClient() { } private static void resetContexts() throws IOException { - FileSystemContext.INSTANCE.reset(); + FileSystemContext.get().reset(); LineageContext.INSTANCE.reset(); } } diff --git a/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java b/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java index fec9eaf66b7b..1729e11ea0cb 100644 --- a/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java +++ b/core/client/hdfs/src/main/java/alluxio/hadoop/AbstractFileSystem.java @@ -142,7 +142,7 @@ public void close() throws IOException { // org.apache.hadoop.fs.FileSystem.close may check the existence of certain temp files before // closing super.close(); - if (mContext != null && mContext != FileSystemContext.INSTANCE) { + if (mContext != null && mContext != FileSystemContext.get()) { mContext.close(); } } @@ -473,7 +473,7 @@ public void initialize(URI uri, org.apache.hadoop.conf.Configuration conf) throw } else { LOG.warn(ExceptionMessage.DIFFERENT_MASTER_ADDRESS .getMessage(mUri.getHost() + ":" + mUri.getPort(), - FileSystemContext.INSTANCE.getMasterAddress())); + FileSystemContext.get().getMasterAddress())); sInitialized = false; } } @@ -510,15 +510,15 @@ void initializeInternal(URI uri, org.apache.hadoop.conf.Configuration conf) thro // These must be reset to pick up the change to the master address. // TODO(andrew): We should reset key value system in this situation - see ALLUXIO-1706. LineageContext.INSTANCE.reset(); - FileSystemContext.INSTANCE.reset(); + FileSystemContext.get().reset(); // Try to connect to master, if it fails, the provided uri is invalid. - FileSystemMasterClient client = FileSystemContext.INSTANCE.acquireMasterClient(); + FileSystemMasterClient client = FileSystemContext.get().acquireMasterClient(); try { client.connect(); // Connected, initialize. } finally { - FileSystemContext.INSTANCE.releaseMasterClient(client); + FileSystemContext.get().releaseMasterClient(client); } } @@ -531,7 +531,7 @@ private void updateFileSystemAndContext() { mContext = FileSystemContext.create(subject); mFileSystem = FileSystem.Factory.get(mContext); } else { - mContext = FileSystemContext.INSTANCE; + mContext = FileSystemContext.get(); mFileSystem = FileSystem.Factory.get(); } } @@ -543,7 +543,7 @@ private void updateFileSystemAndContext() { private boolean checkMasterAddress() { InetSocketAddress masterAddress = null; try { - masterAddress = FileSystemContext.INSTANCE.getMasterAddress(); + masterAddress = FileSystemContext.get().getMasterAddress(); } catch (UnavailableException e) { LOG.warn("Failed to determine master RPC address: {}", e.toString()); return false; diff --git a/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java b/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java index 3e60b90886d4..9cff32732cb8 100644 --- a/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java +++ b/core/client/hdfs/src/test/java/alluxio/hadoop/AbstractFileSystemTest.java @@ -13,15 +13,15 @@ import static java.util.stream.Collectors.toList; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import alluxio.AlluxioURI; import alluxio.ConfigurationRule; @@ -508,9 +508,10 @@ private void mockFileSystemContextAndMasterClient() throws Exception { mMockFileSystemContext = PowerMockito.mock(FileSystemContext.class); mMockFileSystemContextCustomized = PowerMockito.mock(FileSystemContext.class); PowerMockito.mockStatic(FileSystemContext.class); - Whitebox.setInternalState(FileSystemContext.class, "INSTANCE", mMockFileSystemContext); + Whitebox.setInternalState(FileSystemContext.class, "sInstance", mMockFileSystemContext); PowerMockito.when(FileSystemContext.create(any(Subject.class))) .thenReturn(mMockFileSystemContextCustomized); + PowerMockito.when(FileSystemContext.get()).thenReturn(mMockFileSystemContext); mMockFileSystemMasterClient = mock(FileSystemMasterClient.class); when(mMockFileSystemContext.acquireMasterClient()) .thenReturn(mMockFileSystemMasterClient); diff --git a/core/client/hdfs/src/test/java/alluxio/hadoop/HadoopClientTestUtils.java b/core/client/hdfs/src/test/java/alluxio/hadoop/HadoopClientTestUtils.java index 2f4dcf4c829e..46dfdd1b4a80 100644 --- a/core/client/hdfs/src/test/java/alluxio/hadoop/HadoopClientTestUtils.java +++ b/core/client/hdfs/src/test/java/alluxio/hadoop/HadoopClientTestUtils.java @@ -33,7 +33,7 @@ public final class HadoopClientTestUtils { */ public static void resetClient() { try { - FileSystemContext.INSTANCE.reset(); + FileSystemContext.get().reset(); LineageContext.INSTANCE.reset(); Whitebox.setInternalState(AbstractFileSystem.class, "sInitialized", false); } catch (Exception e) { diff --git a/core/common/src/main/java/alluxio/Constants.java b/core/common/src/main/java/alluxio/Constants.java index 2dbec345eabe..81f2a79d6528 100644 --- a/core/common/src/main/java/alluxio/Constants.java +++ b/core/common/src/main/java/alluxio/Constants.java @@ -84,6 +84,7 @@ public final class Constants { public static final long FILE_SYSTEM_WORKER_CLIENT_SERVICE_VERSION = 2; public static final long LINEAGE_MASTER_CLIENT_SERVICE_VERSION = 2; public static final long META_MASTER_CLIENT_SERVICE_VERSION = 2; + public static final long METRICS_MASTER_CLIENT_SERVICE_VERSION = 2; public static final long KEY_VALUE_MASTER_CLIENT_SERVICE_VERSION = 2; public static final long KEY_VALUE_WORKER_SERVICE_VERSION = 2; public static final long UNKNOWN_SERVICE_VERSION = -1; @@ -103,6 +104,7 @@ public final class Constants { public static final String FILE_SYSTEM_MASTER_WORKER_SERVICE_NAME = "FileSystemMasterWorker"; public static final String LINEAGE_MASTER_CLIENT_SERVICE_NAME = "LineageMasterClient"; public static final String META_MASTER_SERVICE_NAME = "MetaMaster"; + public static final String METRICS_MASTER_CLIENT_SERVICE_NAME = "MetricsMasterClient"; public static final String BLOCK_WORKER_CLIENT_SERVICE_NAME = "BlockWorkerClient"; public static final String FILE_SYSTEM_WORKER_CLIENT_SERVICE_NAME = "FileSystemWorkerClient"; public static final String KEY_VALUE_MASTER_CLIENT_SERVICE_NAME = "KeyValueMasterClient"; diff --git a/core/common/src/main/java/alluxio/PropertyKey.java b/core/common/src/main/java/alluxio/PropertyKey.java index 7efaf71684c2..661815dbd99c 100644 --- a/core/common/src/main/java/alluxio/PropertyKey.java +++ b/core/common/src/main/java/alluxio/PropertyKey.java @@ -2369,6 +2369,22 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); + public static final PropertyKey USER_METRICS_COLLECTION_ENABLED = + new Builder(Name.USER_METRICS_COLLECTION_ENABLED) + .setDefaultValue(true) + .setDescription("Enable collecting the client-side metrics and hearbeat them to master") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.CLIENT) + .build(); + public static final PropertyKey USER_METRICS_HEARTBEAT_INTERVAL_MS = + new Builder(Name.USER_METRICS_HEARTBEAT_INTERVAL_MS) + .setAlias(new String[]{"alluxio.user.heartbeat.interval.ms"}) + .setDefaultValue("3sec") + .setDescription("The time period of client master hearbeat to " + + "send the client-side metrics.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.CLIENT) + .build(); public static final PropertyKey USER_NETWORK_NETTY_CHANNEL = new Builder(Name.USER_NETWORK_NETTY_CHANNEL) .setDescription("Type of netty channels.") @@ -3285,6 +3301,10 @@ public static final class Name { "alluxio.user.local.reader.packet.size.bytes"; public static final String USER_LOCAL_WRITER_PACKET_SIZE_BYTES = "alluxio.user.local.writer.packet.size.bytes"; + public static final String USER_METRICS_COLLECTION_ENABLED = + "alluxio.user.metrics.collection.enabled"; + public static final String USER_METRICS_HEARTBEAT_INTERVAL_MS = + "alluxio.user.metrics.heartbeat.interval"; public static final String USER_NETWORK_NETTY_CHANNEL = "alluxio.user.network.netty.channel"; public static final String USER_NETWORK_NETTY_TIMEOUT_MS = "alluxio.user.network.netty.timeout"; diff --git a/core/common/src/main/java/alluxio/heartbeat/HeartbeatContext.java b/core/common/src/main/java/alluxio/heartbeat/HeartbeatContext.java index 2e4998cd6f36..b740a9fa02a9 100644 --- a/core/common/src/main/java/alluxio/heartbeat/HeartbeatContext.java +++ b/core/common/src/main/java/alluxio/heartbeat/HeartbeatContext.java @@ -34,6 +34,7 @@ public final class HeartbeatContext { public static final String MASTER_FILE_RECOMPUTATION = "Master File Recomputation"; public static final String MASTER_LOST_FILES_DETECTION = "Master Lost Files Detection"; public static final String MASTER_LOST_WORKER_DETECTION = "Master Lost Worker Detection"; + public static final String MASTER_METRICS_SYNC = "Master Metrics Sync"; public static final String MASTER_TTL_CHECK = "Master TTL Check"; public static final String WORKER_BLOCK_SYNC = "Worker Block Sync"; public static final String WORKER_CLIENT = "Worker Client"; @@ -54,6 +55,7 @@ public final class HeartbeatContext { sTimerClasses.put(WORKER_CLIENT, SLEEPING_TIMER_CLASS); sTimerClasses.put(WORKER_PIN_LIST_SYNC, SLEEPING_TIMER_CLASS); sTimerClasses.put(WORKER_SPACE_RESERVER, SLEEPING_TIMER_CLASS); + sTimerClasses.put(MASTER_METRICS_SYNC, SLEEPING_TIMER_CLASS); } private HeartbeatContext() {} // to prevent initialization diff --git a/core/common/src/main/java/alluxio/metrics/ClientMetrics.java b/core/common/src/main/java/alluxio/metrics/ClientMetrics.java new file mode 100644 index 000000000000..9b1e57b6bc9c --- /dev/null +++ b/core/common/src/main/java/alluxio/metrics/ClientMetrics.java @@ -0,0 +1,22 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.metrics; + +/** + * Metrics of an Alluxio client. + */ +public final class ClientMetrics { + /** Total number of bytes short-circuit read from local storage. */ + public static final String BYTES_READ_LOCAL = "BytesReadLocal"; + + private ClientMetrics() {} // prevent instantiation +} diff --git a/core/common/src/main/java/alluxio/metrics/Metric.java b/core/common/src/main/java/alluxio/metrics/Metric.java index 48e20a94a5f5..20fd44629bf8 100644 --- a/core/common/src/main/java/alluxio/metrics/Metric.java +++ b/core/common/src/main/java/alluxio/metrics/Metric.java @@ -22,23 +22,40 @@ public final class Metric implements Serializable { private static final long serialVersionUID = -2236393414222298333L; - private final MetricsSystem.InstanceType mInstance; + private static final String ID_SEPARATOR = "-id:"; + private final MetricsSystem.InstanceType mInstanceType; private final String mHostname; private final String mName; private final Long mValue; + private String mInstanceId; /** * Constructs a {@link Metric} instance. * - * @param instance the instance + * @param instanceType the instance type * @param hostname the hostname * @param name the metric name * @param value the value */ - public Metric(MetricsSystem.InstanceType instance, String hostname, String name, Long value) { + public Metric(MetricsSystem.InstanceType instanceType, String hostname, String name, Long value) { + this(instanceType, hostname, null, name, value); + } + + /** + * Constructs a {@link Metric} instance. + * + * @param instanceType the instance type + * @param hostname the hostname + * @param id the instance id + * @param name the metric name + * @param value the value + */ + public Metric(MetricsSystem.InstanceType instanceType, String hostname, String id, String name, + Long value) { Preconditions.checkNotNull(name); - mInstance = instance; + mInstanceType = instanceType; mHostname = hostname; + mInstanceId = id; mName = name; mValue = value; } @@ -46,8 +63,8 @@ public Metric(MetricsSystem.InstanceType instance, String hostname, String name, /** * @return the instance type */ - public MetricsSystem.InstanceType getInstance() { - return mInstance; + public MetricsSystem.InstanceType getInstanceType() { + return mInstanceType; } /** @@ -71,6 +88,22 @@ public long getValue() { return mValue; } + /** + * @return the instance id + */ + public String getInstanceId() { + return mInstanceId; + } + + /** + * Sets the instance id. + * + * @param instanceId the instance id; + */ + public void setInstanceId(String instanceId) { + mInstanceId = instanceId; + } + @Override public boolean equals(Object other) { if (other == null) { @@ -80,24 +113,32 @@ public boolean equals(Object other) { return false; } Metric metric = (Metric) other; - return Objects.equal(mHostname, metric.mHostname) && Objects.equal(mInstance, metric.mInstance) - && Objects.equal(mName, metric.mName) && Objects.equal(mValue, metric.mValue); + return Objects.equal(mHostname, metric.mHostname) + && Objects.equal(mInstanceType, metric.mInstanceType) + && Objects.equal(mInstanceId, metric.mInstanceId) && Objects.equal(mName, metric.mName) + && Objects.equal(mValue, metric.mValue); } @Override public int hashCode() { - return Objects.hashCode(mHostname, mInstance, mValue, mName); + return Objects.hashCode(mHostname, mInstanceType, mInstanceId, mValue, mName); } /** - * @return the fully qualified metric name, which is of pattern instance.[hostname.].value + * @return the fully qualified metric name, which is of pattern + * instance.[hostname-id:instanceId.]value */ public String getFullMetricName() { StringBuilder sb = new StringBuilder(); - sb.append(mInstance).append('.'); + sb.append(mInstanceType).append('.'); if (mHostname != null) { - sb.append(mHostname).append('.'); + sb.append(mHostname); + if (mInstanceId != null) { + sb.append(ID_SEPARATOR).append(mInstanceId); + } + sb.append('.'); } + sb.append(mName); return sb.toString(); } @@ -107,9 +148,10 @@ public String getFullMetricName() { */ public alluxio.thrift.Metric toThrift() { alluxio.thrift.Metric metric = new alluxio.thrift.Metric(); - metric.setInstance(mInstance.toString()); + metric.setInstance(mInstanceType.toString()); metric.setHostname(mHostname); metric.setName(mName); + metric.setInstanceId(mInstanceId); metric.setValue(mValue); return metric; } @@ -126,13 +168,20 @@ public static Metric from(String fullName, long value) { Preconditions.checkArgument(pieces.length > 1, "Incorrect metrics name: %s.", fullName); String hostname = null; + String id = null; // Master or cluster metrics don't have hostname included. if (!pieces[0].equals(MetricsSystem.InstanceType.MASTER.toString())) { - hostname = pieces[1]; + if (pieces[1].contains(ID_SEPARATOR)) { + String[] ids = pieces[1].split(ID_SEPARATOR); + hostname = ids[0]; + id = ids[1]; + } else { + hostname = pieces[1]; + } } MetricsSystem.InstanceType instance = MetricsSystem.InstanceType.fromString(pieces[0]); String name = MetricsSystem.stripInstanceAndHost(fullName); - return new Metric(instance, hostname, name, value); + return new Metric(instance, hostname, id, name, value); } /** @@ -143,12 +192,13 @@ public static Metric from(String fullName, long value) { */ public static Metric from(alluxio.thrift.Metric metric) { return new Metric(MetricsSystem.InstanceType.fromString(metric.getInstance()), - metric.getHostname(), metric.getName(), metric.getValue()); + metric.getHostname(), metric.getInstanceId(), metric.getName(), metric.getValue()); } @Override public String toString() { - return Objects.toStringHelper(this).add("instance", mInstance).add("hostname", mHostname) - .add("name", mName).add("value", mValue).toString(); + return Objects.toStringHelper(this).add("instanceType", mInstanceType) + .add("hostname", mHostname).add("instanceId", mInstanceId).add("name", mName) + .add("value", mValue).toString(); } } diff --git a/core/common/src/main/java/alluxio/thrift/Metric.java b/core/common/src/main/java/alluxio/thrift/Metric.java index 12aa62ef438a..4aec2d4dfc6d 100644 --- a/core/common/src/main/java/alluxio/thrift/Metric.java +++ b/core/common/src/main/java/alluxio/thrift/Metric.java @@ -43,8 +43,9 @@ public class Metric implements org.apache.thrift.TBase, private static final org.apache.thrift.protocol.TField INSTANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("instance", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField HOSTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("hostname", org.apache.thrift.protocol.TType.STRING, (short)2); - private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)3); - private static final org.apache.thrift.protocol.TField VALUE_FIELD_DESC = new org.apache.thrift.protocol.TField("value", org.apache.thrift.protocol.TType.I64, (short)4); + private static final org.apache.thrift.protocol.TField INSTANCE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("instanceId", org.apache.thrift.protocol.TType.STRING, (short)3); + private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)4); + private static final org.apache.thrift.protocol.TField VALUE_FIELD_DESC = new org.apache.thrift.protocol.TField("value", org.apache.thrift.protocol.TType.I64, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -54,6 +55,7 @@ public class Metric implements org.apache.thrift.TBase, private String instance; // required private String hostname; // required + private String instanceId; // required private String name; // required private long value; // required @@ -61,8 +63,9 @@ public class Metric implements org.apache.thrift.TBase, public enum _Fields implements org.apache.thrift.TFieldIdEnum { INSTANCE((short)1, "instance"), HOSTNAME((short)2, "hostname"), - NAME((short)3, "name"), - VALUE((short)4, "value"); + INSTANCE_ID((short)3, "instanceId"), + NAME((short)4, "name"), + VALUE((short)5, "value"); private static final Map byName = new HashMap(); @@ -81,9 +84,11 @@ public static _Fields findByThriftId(int fieldId) { return INSTANCE; case 2: // HOSTNAME return HOSTNAME; - case 3: // NAME + case 3: // INSTANCE_ID + return INSTANCE_ID; + case 4: // NAME return NAME; - case 4: // VALUE + case 5: // VALUE return VALUE; default: return null; @@ -134,6 +139,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.HOSTNAME, new org.apache.thrift.meta_data.FieldMetaData("hostname", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.INSTANCE_ID, new org.apache.thrift.meta_data.FieldMetaData("instanceId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.VALUE, new org.apache.thrift.meta_data.FieldMetaData("value", org.apache.thrift.TFieldRequirementType.DEFAULT, @@ -148,12 +155,14 @@ public Metric() { public Metric( String instance, String hostname, + String instanceId, String name, long value) { this(); this.instance = instance; this.hostname = hostname; + this.instanceId = instanceId; this.name = name; this.value = value; setValueIsSet(true); @@ -170,6 +179,9 @@ public Metric(Metric other) { if (other.isSetHostname()) { this.hostname = other.hostname; } + if (other.isSetInstanceId()) { + this.instanceId = other.instanceId; + } if (other.isSetName()) { this.name = other.name; } @@ -184,6 +196,7 @@ public Metric deepCopy() { public void clear() { this.instance = null; this.hostname = null; + this.instanceId = null; this.name = null; setValueIsSet(false); this.value = 0; @@ -237,6 +250,30 @@ public void setHostnameIsSet(boolean value) { } } + public String getInstanceId() { + return this.instanceId; + } + + public Metric setInstanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public void unsetInstanceId() { + this.instanceId = null; + } + + /** Returns true if field instanceId is set (has been assigned a value) and false otherwise */ + public boolean isSetInstanceId() { + return this.instanceId != null; + } + + public void setInstanceIdIsSet(boolean value) { + if (!value) { + this.instanceId = null; + } + } + public String getName() { return this.name; } @@ -302,6 +339,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case INSTANCE_ID: + if (value == null) { + unsetInstanceId(); + } else { + setInstanceId((String)value); + } + break; + case NAME: if (value == null) { unsetName(); @@ -329,6 +374,9 @@ public Object getFieldValue(_Fields field) { case HOSTNAME: return getHostname(); + case INSTANCE_ID: + return getInstanceId(); + case NAME: return getName(); @@ -350,6 +398,8 @@ public boolean isSet(_Fields field) { return isSetInstance(); case HOSTNAME: return isSetHostname(); + case INSTANCE_ID: + return isSetInstanceId(); case NAME: return isSetName(); case VALUE: @@ -389,6 +439,15 @@ public boolean equals(Metric that) { return false; } + boolean this_present_instanceId = true && this.isSetInstanceId(); + boolean that_present_instanceId = true && that.isSetInstanceId(); + if (this_present_instanceId || that_present_instanceId) { + if (!(this_present_instanceId && that_present_instanceId)) + return false; + if (!this.instanceId.equals(that.instanceId)) + return false; + } + boolean this_present_name = true && this.isSetName(); boolean that_present_name = true && that.isSetName(); if (this_present_name || that_present_name) { @@ -424,6 +483,11 @@ public int hashCode() { if (present_hostname) list.add(hostname); + boolean present_instanceId = true && (isSetInstanceId()); + list.add(present_instanceId); + if (present_instanceId) + list.add(instanceId); + boolean present_name = true && (isSetName()); list.add(present_name); if (present_name) @@ -465,6 +529,16 @@ public int compareTo(Metric other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetInstanceId()).compareTo(other.isSetInstanceId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetInstanceId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.instanceId, other.instanceId); + if (lastComparison != 0) { + return lastComparison; + } + } lastComparison = Boolean.valueOf(isSetName()).compareTo(other.isSetName()); if (lastComparison != 0) { return lastComparison; @@ -521,6 +595,14 @@ public String toString() { } first = false; if (!first) sb.append(", "); + sb.append("instanceId:"); + if (this.instanceId == null) { + sb.append("null"); + } else { + sb.append(this.instanceId); + } + first = false; + if (!first) sb.append(", "); sb.append("name:"); if (this.name == null) { sb.append("null"); @@ -593,7 +675,15 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, Metric struct) thro org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 3: // NAME + case 3: // INSTANCE_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.instanceId = iprot.readString(); + struct.setInstanceIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 4: // NAME if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.name = iprot.readString(); struct.setNameIsSet(true); @@ -601,7 +691,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, Metric struct) thro org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 4: // VALUE + case 5: // VALUE if (schemeField.type == org.apache.thrift.protocol.TType.I64) { struct.value = iprot.readI64(); struct.setValueIsSet(true); @@ -634,6 +724,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, Metric struct) thr oprot.writeString(struct.hostname); oprot.writeFieldEnd(); } + if (struct.instanceId != null) { + oprot.writeFieldBegin(INSTANCE_ID_FIELD_DESC); + oprot.writeString(struct.instanceId); + oprot.writeFieldEnd(); + } if (struct.name != null) { oprot.writeFieldBegin(NAME_FIELD_DESC); oprot.writeString(struct.name); @@ -666,19 +761,25 @@ public void write(org.apache.thrift.protocol.TProtocol prot, Metric struct) thro if (struct.isSetHostname()) { optionals.set(1); } - if (struct.isSetName()) { + if (struct.isSetInstanceId()) { optionals.set(2); } - if (struct.isSetValue()) { + if (struct.isSetName()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetValue()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetInstance()) { oprot.writeString(struct.instance); } if (struct.isSetHostname()) { oprot.writeString(struct.hostname); } + if (struct.isSetInstanceId()) { + oprot.writeString(struct.instanceId); + } if (struct.isSetName()) { oprot.writeString(struct.name); } @@ -690,7 +791,7 @@ public void write(org.apache.thrift.protocol.TProtocol prot, Metric struct) thro @Override public void read(org.apache.thrift.protocol.TProtocol prot, Metric struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(4); + BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.instance = iprot.readString(); struct.setInstanceIsSet(true); @@ -700,10 +801,14 @@ public void read(org.apache.thrift.protocol.TProtocol prot, Metric struct) throw struct.setHostnameIsSet(true); } if (incoming.get(2)) { + struct.instanceId = iprot.readString(); + struct.setInstanceIdIsSet(true); + } + if (incoming.get(3)) { struct.name = iprot.readString(); struct.setNameIsSet(true); } - if (incoming.get(3)) { + if (incoming.get(4)) { struct.value = iprot.readI64(); struct.setValueIsSet(true); } diff --git a/core/common/src/main/java/alluxio/thrift/MetricsHeartbeatTOptions.java b/core/common/src/main/java/alluxio/thrift/MetricsHeartbeatTOptions.java new file mode 100644 index 000000000000..9548bed9a076 --- /dev/null +++ b/core/common/src/main/java/alluxio/thrift/MetricsHeartbeatTOptions.java @@ -0,0 +1,452 @@ +/** + * Autogenerated by Thrift Compiler (0.9.3) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package alluxio.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") +public class MetricsHeartbeatTOptions implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("MetricsHeartbeatTOptions"); + + private static final org.apache.thrift.protocol.TField METRICS_FIELD_DESC = new org.apache.thrift.protocol.TField("metrics", org.apache.thrift.protocol.TType.LIST, (short)1); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new MetricsHeartbeatTOptionsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new MetricsHeartbeatTOptionsTupleSchemeFactory()); + } + + private List metrics; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + METRICS((short)1, "metrics"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // METRICS + return METRICS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.METRICS, new org.apache.thrift.meta_data.FieldMetaData("metrics", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, alluxio.thrift.Metric.class)))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(MetricsHeartbeatTOptions.class, metaDataMap); + } + + public MetricsHeartbeatTOptions() { + } + + public MetricsHeartbeatTOptions( + List metrics) + { + this(); + this.metrics = metrics; + } + + /** + * Performs a deep copy on other. + */ + public MetricsHeartbeatTOptions(MetricsHeartbeatTOptions other) { + if (other.isSetMetrics()) { + List __this__metrics = new ArrayList(other.metrics.size()); + for (alluxio.thrift.Metric other_element : other.metrics) { + __this__metrics.add(new alluxio.thrift.Metric(other_element)); + } + this.metrics = __this__metrics; + } + } + + public MetricsHeartbeatTOptions deepCopy() { + return new MetricsHeartbeatTOptions(this); + } + + @Override + public void clear() { + this.metrics = null; + } + + public int getMetricsSize() { + return (this.metrics == null) ? 0 : this.metrics.size(); + } + + public java.util.Iterator getMetricsIterator() { + return (this.metrics == null) ? null : this.metrics.iterator(); + } + + public void addToMetrics(alluxio.thrift.Metric elem) { + if (this.metrics == null) { + this.metrics = new ArrayList(); + } + this.metrics.add(elem); + } + + public List getMetrics() { + return this.metrics; + } + + public MetricsHeartbeatTOptions setMetrics(List metrics) { + this.metrics = metrics; + return this; + } + + public void unsetMetrics() { + this.metrics = null; + } + + /** Returns true if field metrics is set (has been assigned a value) and false otherwise */ + public boolean isSetMetrics() { + return this.metrics != null; + } + + public void setMetricsIsSet(boolean value) { + if (!value) { + this.metrics = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case METRICS: + if (value == null) { + unsetMetrics(); + } else { + setMetrics((List)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case METRICS: + return getMetrics(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case METRICS: + return isSetMetrics(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof MetricsHeartbeatTOptions) + return this.equals((MetricsHeartbeatTOptions)that); + return false; + } + + public boolean equals(MetricsHeartbeatTOptions that) { + if (that == null) + return false; + + boolean this_present_metrics = true && this.isSetMetrics(); + boolean that_present_metrics = true && that.isSetMetrics(); + if (this_present_metrics || that_present_metrics) { + if (!(this_present_metrics && that_present_metrics)) + return false; + if (!this.metrics.equals(that.metrics)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + boolean present_metrics = true && (isSetMetrics()); + list.add(present_metrics); + if (present_metrics) + list.add(metrics); + + return list.hashCode(); + } + + @Override + public int compareTo(MetricsHeartbeatTOptions other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetMetrics()).compareTo(other.isSetMetrics()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMetrics()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.metrics, other.metrics); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("MetricsHeartbeatTOptions("); + boolean first = true; + + sb.append("metrics:"); + if (this.metrics == null) { + sb.append("null"); + } else { + sb.append(this.metrics); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class MetricsHeartbeatTOptionsStandardSchemeFactory implements SchemeFactory { + public MetricsHeartbeatTOptionsStandardScheme getScheme() { + return new MetricsHeartbeatTOptionsStandardScheme(); + } + } + + private static class MetricsHeartbeatTOptionsStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, MetricsHeartbeatTOptions struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // METRICS + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list0 = iprot.readListBegin(); + struct.metrics = new ArrayList(_list0.size); + alluxio.thrift.Metric _elem1; + for (int _i2 = 0; _i2 < _list0.size; ++_i2) + { + _elem1 = new alluxio.thrift.Metric(); + _elem1.read(iprot); + struct.metrics.add(_elem1); + } + iprot.readListEnd(); + } + struct.setMetricsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, MetricsHeartbeatTOptions struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.metrics != null) { + oprot.writeFieldBegin(METRICS_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.metrics.size())); + for (alluxio.thrift.Metric _iter3 : struct.metrics) + { + _iter3.write(oprot); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class MetricsHeartbeatTOptionsTupleSchemeFactory implements SchemeFactory { + public MetricsHeartbeatTOptionsTupleScheme getScheme() { + return new MetricsHeartbeatTOptionsTupleScheme(); + } + } + + private static class MetricsHeartbeatTOptionsTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, MetricsHeartbeatTOptions struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetMetrics()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetMetrics()) { + { + oprot.writeI32(struct.metrics.size()); + for (alluxio.thrift.Metric _iter4 : struct.metrics) + { + _iter4.write(oprot); + } + } + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, MetricsHeartbeatTOptions struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + { + org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.metrics = new ArrayList(_list5.size); + alluxio.thrift.Metric _elem6; + for (int _i7 = 0; _i7 < _list5.size; ++_i7) + { + _elem6 = new alluxio.thrift.Metric(); + _elem6.read(iprot); + struct.metrics.add(_elem6); + } + } + struct.setMetricsIsSet(true); + } + } + } + +} + diff --git a/core/common/src/main/java/alluxio/thrift/MetricsHeartbeatTResponse.java b/core/common/src/main/java/alluxio/thrift/MetricsHeartbeatTResponse.java new file mode 100644 index 000000000000..99e5f8f511be --- /dev/null +++ b/core/common/src/main/java/alluxio/thrift/MetricsHeartbeatTResponse.java @@ -0,0 +1,285 @@ +/** + * Autogenerated by Thrift Compiler (0.9.3) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package alluxio.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") +public class MetricsHeartbeatTResponse implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("MetricsHeartbeatTResponse"); + + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new MetricsHeartbeatTResponseStandardSchemeFactory()); + schemes.put(TupleScheme.class, new MetricsHeartbeatTResponseTupleSchemeFactory()); + } + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(MetricsHeartbeatTResponse.class, metaDataMap); + } + + public MetricsHeartbeatTResponse() { + } + + /** + * Performs a deep copy on other. + */ + public MetricsHeartbeatTResponse(MetricsHeartbeatTResponse other) { + } + + public MetricsHeartbeatTResponse deepCopy() { + return new MetricsHeartbeatTResponse(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof MetricsHeartbeatTResponse) + return this.equals((MetricsHeartbeatTResponse)that); + return false; + } + + public boolean equals(MetricsHeartbeatTResponse that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + return list.hashCode(); + } + + @Override + public int compareTo(MetricsHeartbeatTResponse other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("MetricsHeartbeatTResponse("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class MetricsHeartbeatTResponseStandardSchemeFactory implements SchemeFactory { + public MetricsHeartbeatTResponseStandardScheme getScheme() { + return new MetricsHeartbeatTResponseStandardScheme(); + } + } + + private static class MetricsHeartbeatTResponseStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, MetricsHeartbeatTResponse struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, MetricsHeartbeatTResponse struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class MetricsHeartbeatTResponseTupleSchemeFactory implements SchemeFactory { + public MetricsHeartbeatTResponseTupleScheme getScheme() { + return new MetricsHeartbeatTResponseTupleScheme(); + } + } + + private static class MetricsHeartbeatTResponseTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, MetricsHeartbeatTResponse struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, MetricsHeartbeatTResponse struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + } + } + +} + diff --git a/core/common/src/main/java/alluxio/thrift/MetricsMasterClientService.java b/core/common/src/main/java/alluxio/thrift/MetricsMasterClientService.java new file mode 100644 index 000000000000..744f6f0ccd8f --- /dev/null +++ b/core/common/src/main/java/alluxio/thrift/MetricsMasterClientService.java @@ -0,0 +1,1361 @@ +/** + * Autogenerated by Thrift Compiler (0.9.3) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package alluxio.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") +public class MetricsMasterClientService { + + /** + * This interface contains metrics master service endpoints for Alluxio clients. + */ + public interface Iface extends alluxio.thrift.AlluxioService.Iface { + + /** + * Periodic metrics master client heartbeat. + * + * @param clientId the id of the client + * + * @param hostname the client hostname + * + * @param options the method options + */ + public MetricsHeartbeatTResponse metricsHeartbeat(String clientId, String hostname, MetricsHeartbeatTOptions options) throws alluxio.thrift.AlluxioTException, org.apache.thrift.TException; + + } + + public interface AsyncIface extends alluxio.thrift.AlluxioService .AsyncIface { + + public void metricsHeartbeat(String clientId, String hostname, MetricsHeartbeatTOptions options, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + + } + + public static class Client extends alluxio.thrift.AlluxioService.Client implements Iface { + public static class Factory implements org.apache.thrift.TServiceClientFactory { + public Factory() {} + public Client getClient(org.apache.thrift.protocol.TProtocol prot) { + return new Client(prot); + } + public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { + return new Client(iprot, oprot); + } + } + + public Client(org.apache.thrift.protocol.TProtocol prot) + { + super(prot, prot); + } + + public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { + super(iprot, oprot); + } + + public MetricsHeartbeatTResponse metricsHeartbeat(String clientId, String hostname, MetricsHeartbeatTOptions options) throws alluxio.thrift.AlluxioTException, org.apache.thrift.TException + { + send_metricsHeartbeat(clientId, hostname, options); + return recv_metricsHeartbeat(); + } + + public void send_metricsHeartbeat(String clientId, String hostname, MetricsHeartbeatTOptions options) throws org.apache.thrift.TException + { + metricsHeartbeat_args args = new metricsHeartbeat_args(); + args.setClientId(clientId); + args.setHostname(hostname); + args.setOptions(options); + sendBase("metricsHeartbeat", args); + } + + public MetricsHeartbeatTResponse recv_metricsHeartbeat() throws alluxio.thrift.AlluxioTException, org.apache.thrift.TException + { + metricsHeartbeat_result result = new metricsHeartbeat_result(); + receiveBase(result, "metricsHeartbeat"); + if (result.isSetSuccess()) { + return result.success; + } + if (result.e != null) { + throw result.e; + } + throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "metricsHeartbeat failed: unknown result"); + } + + } + public static class AsyncClient extends alluxio.thrift.AlluxioService.AsyncClient implements AsyncIface { + public static class Factory implements org.apache.thrift.async.TAsyncClientFactory { + private org.apache.thrift.async.TAsyncClientManager clientManager; + private org.apache.thrift.protocol.TProtocolFactory protocolFactory; + public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) { + this.clientManager = clientManager; + this.protocolFactory = protocolFactory; + } + public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) { + return new AsyncClient(protocolFactory, clientManager, transport); + } + } + + public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { + super(protocolFactory, clientManager, transport); + } + + public void metricsHeartbeat(String clientId, String hostname, MetricsHeartbeatTOptions options, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + metricsHeartbeat_call method_call = new metricsHeartbeat_call(clientId, hostname, options, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class metricsHeartbeat_call extends org.apache.thrift.async.TAsyncMethodCall { + private String clientId; + private String hostname; + private MetricsHeartbeatTOptions options; + public metricsHeartbeat_call(String clientId, String hostname, MetricsHeartbeatTOptions options, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.clientId = clientId; + this.hostname = hostname; + this.options = options; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("metricsHeartbeat", org.apache.thrift.protocol.TMessageType.CALL, 0)); + metricsHeartbeat_args args = new metricsHeartbeat_args(); + args.setClientId(clientId); + args.setHostname(hostname); + args.setOptions(options); + args.write(prot); + prot.writeMessageEnd(); + } + + public MetricsHeartbeatTResponse getResult() throws alluxio.thrift.AlluxioTException, org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_metricsHeartbeat(); + } + } + + } + + public static class Processor extends alluxio.thrift.AlluxioService.Processor implements org.apache.thrift.TProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName()); + public Processor(I iface) { + super(iface, getProcessMap(new HashMap>())); + } + + protected Processor(I iface, Map> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static Map> getProcessMap(Map> processMap) { + processMap.put("metricsHeartbeat", new metricsHeartbeat()); + return processMap; + } + + public static class metricsHeartbeat extends org.apache.thrift.ProcessFunction { + public metricsHeartbeat() { + super("metricsHeartbeat"); + } + + public metricsHeartbeat_args getEmptyArgsInstance() { + return new metricsHeartbeat_args(); + } + + protected boolean isOneway() { + return false; + } + + public metricsHeartbeat_result getResult(I iface, metricsHeartbeat_args args) throws org.apache.thrift.TException { + metricsHeartbeat_result result = new metricsHeartbeat_result(); + try { + result.success = iface.metricsHeartbeat(args.clientId, args.hostname, args.options); + } catch (alluxio.thrift.AlluxioTException e) { + result.e = e; + } + return result; + } + } + + } + + public static class AsyncProcessor extends alluxio.thrift.AlluxioService.AsyncProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName()); + public AsyncProcessor(I iface) { + super(iface, getProcessMap(new HashMap>())); + } + + protected AsyncProcessor(I iface, Map> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static Map> getProcessMap(Map> processMap) { + processMap.put("metricsHeartbeat", new metricsHeartbeat()); + return processMap; + } + + public static class metricsHeartbeat extends org.apache.thrift.AsyncProcessFunction { + public metricsHeartbeat() { + super("metricsHeartbeat"); + } + + public metricsHeartbeat_args getEmptyArgsInstance() { + return new metricsHeartbeat_args(); + } + + public AsyncMethodCallback getResultHandler(final AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new AsyncMethodCallback() { + public void onComplete(MetricsHeartbeatTResponse o) { + metricsHeartbeat_result result = new metricsHeartbeat_result(); + result.success = o; + try { + fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + return; + } catch (Exception e) { + LOGGER.error("Exception writing to internal frame buffer", e); + } + fb.close(); + } + public void onError(Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TBase msg; + metricsHeartbeat_result result = new metricsHeartbeat_result(); + if (e instanceof alluxio.thrift.AlluxioTException) { + result.e = (alluxio.thrift.AlluxioTException) e; + result.setEIsSet(true); + msg = result; + } + else + { + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + return; + } catch (Exception ex) { + LOGGER.error("Exception writing to internal frame buffer", ex); + } + fb.close(); + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, metricsHeartbeat_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws TException { + iface.metricsHeartbeat(args.clientId, args.hostname, args.options,resultHandler); + } + } + + } + + public static class metricsHeartbeat_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("metricsHeartbeat_args"); + + private static final org.apache.thrift.protocol.TField CLIENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("clientId", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField HOSTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("hostname", org.apache.thrift.protocol.TType.STRING, (short)2); + private static final org.apache.thrift.protocol.TField OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("options", org.apache.thrift.protocol.TType.STRUCT, (short)3); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new metricsHeartbeat_argsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new metricsHeartbeat_argsTupleSchemeFactory()); + } + + private String clientId; // required + private String hostname; // required + private MetricsHeartbeatTOptions options; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + /** + * the id of the client + */ + CLIENT_ID((short)1, "clientId"), + /** + * the client hostname + */ + HOSTNAME((short)2, "hostname"), + /** + * the method options + */ + OPTIONS((short)3, "options"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // CLIENT_ID + return CLIENT_ID; + case 2: // HOSTNAME + return HOSTNAME; + case 3: // OPTIONS + return OPTIONS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.CLIENT_ID, new org.apache.thrift.meta_data.FieldMetaData("clientId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.HOSTNAME, new org.apache.thrift.meta_data.FieldMetaData("hostname", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.OPTIONS, new org.apache.thrift.meta_data.FieldMetaData("options", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, MetricsHeartbeatTOptions.class))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(metricsHeartbeat_args.class, metaDataMap); + } + + public metricsHeartbeat_args() { + } + + public metricsHeartbeat_args( + String clientId, + String hostname, + MetricsHeartbeatTOptions options) + { + this(); + this.clientId = clientId; + this.hostname = hostname; + this.options = options; + } + + /** + * Performs a deep copy on other. + */ + public metricsHeartbeat_args(metricsHeartbeat_args other) { + if (other.isSetClientId()) { + this.clientId = other.clientId; + } + if (other.isSetHostname()) { + this.hostname = other.hostname; + } + if (other.isSetOptions()) { + this.options = new MetricsHeartbeatTOptions(other.options); + } + } + + public metricsHeartbeat_args deepCopy() { + return new metricsHeartbeat_args(this); + } + + @Override + public void clear() { + this.clientId = null; + this.hostname = null; + this.options = null; + } + + /** + * the id of the client + */ + public String getClientId() { + return this.clientId; + } + + /** + * the id of the client + */ + public metricsHeartbeat_args setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public void unsetClientId() { + this.clientId = null; + } + + /** Returns true if field clientId is set (has been assigned a value) and false otherwise */ + public boolean isSetClientId() { + return this.clientId != null; + } + + public void setClientIdIsSet(boolean value) { + if (!value) { + this.clientId = null; + } + } + + /** + * the client hostname + */ + public String getHostname() { + return this.hostname; + } + + /** + * the client hostname + */ + public metricsHeartbeat_args setHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public void unsetHostname() { + this.hostname = null; + } + + /** Returns true if field hostname is set (has been assigned a value) and false otherwise */ + public boolean isSetHostname() { + return this.hostname != null; + } + + public void setHostnameIsSet(boolean value) { + if (!value) { + this.hostname = null; + } + } + + /** + * the method options + */ + public MetricsHeartbeatTOptions getOptions() { + return this.options; + } + + /** + * the method options + */ + public metricsHeartbeat_args setOptions(MetricsHeartbeatTOptions options) { + this.options = options; + return this; + } + + public void unsetOptions() { + this.options = null; + } + + /** Returns true if field options is set (has been assigned a value) and false otherwise */ + public boolean isSetOptions() { + return this.options != null; + } + + public void setOptionsIsSet(boolean value) { + if (!value) { + this.options = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case CLIENT_ID: + if (value == null) { + unsetClientId(); + } else { + setClientId((String)value); + } + break; + + case HOSTNAME: + if (value == null) { + unsetHostname(); + } else { + setHostname((String)value); + } + break; + + case OPTIONS: + if (value == null) { + unsetOptions(); + } else { + setOptions((MetricsHeartbeatTOptions)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case CLIENT_ID: + return getClientId(); + + case HOSTNAME: + return getHostname(); + + case OPTIONS: + return getOptions(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case CLIENT_ID: + return isSetClientId(); + case HOSTNAME: + return isSetHostname(); + case OPTIONS: + return isSetOptions(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof metricsHeartbeat_args) + return this.equals((metricsHeartbeat_args)that); + return false; + } + + public boolean equals(metricsHeartbeat_args that) { + if (that == null) + return false; + + boolean this_present_clientId = true && this.isSetClientId(); + boolean that_present_clientId = true && that.isSetClientId(); + if (this_present_clientId || that_present_clientId) { + if (!(this_present_clientId && that_present_clientId)) + return false; + if (!this.clientId.equals(that.clientId)) + return false; + } + + boolean this_present_hostname = true && this.isSetHostname(); + boolean that_present_hostname = true && that.isSetHostname(); + if (this_present_hostname || that_present_hostname) { + if (!(this_present_hostname && that_present_hostname)) + return false; + if (!this.hostname.equals(that.hostname)) + return false; + } + + boolean this_present_options = true && this.isSetOptions(); + boolean that_present_options = true && that.isSetOptions(); + if (this_present_options || that_present_options) { + if (!(this_present_options && that_present_options)) + return false; + if (!this.options.equals(that.options)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + boolean present_clientId = true && (isSetClientId()); + list.add(present_clientId); + if (present_clientId) + list.add(clientId); + + boolean present_hostname = true && (isSetHostname()); + list.add(present_hostname); + if (present_hostname) + list.add(hostname); + + boolean present_options = true && (isSetOptions()); + list.add(present_options); + if (present_options) + list.add(options); + + return list.hashCode(); + } + + @Override + public int compareTo(metricsHeartbeat_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetClientId()).compareTo(other.isSetClientId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetClientId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.clientId, other.clientId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetHostname()).compareTo(other.isSetHostname()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetHostname()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.hostname, other.hostname); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetOptions()).compareTo(other.isSetOptions()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOptions()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.options, other.options); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("metricsHeartbeat_args("); + boolean first = true; + + sb.append("clientId:"); + if (this.clientId == null) { + sb.append("null"); + } else { + sb.append(this.clientId); + } + first = false; + if (!first) sb.append(", "); + sb.append("hostname:"); + if (this.hostname == null) { + sb.append("null"); + } else { + sb.append(this.hostname); + } + first = false; + if (!first) sb.append(", "); + sb.append("options:"); + if (this.options == null) { + sb.append("null"); + } else { + sb.append(this.options); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (options != null) { + options.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class metricsHeartbeat_argsStandardSchemeFactory implements SchemeFactory { + public metricsHeartbeat_argsStandardScheme getScheme() { + return new metricsHeartbeat_argsStandardScheme(); + } + } + + private static class metricsHeartbeat_argsStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, metricsHeartbeat_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // CLIENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.clientId = iprot.readString(); + struct.setClientIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // HOSTNAME + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.hostname = iprot.readString(); + struct.setHostnameIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // OPTIONS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.options = new MetricsHeartbeatTOptions(); + struct.options.read(iprot); + struct.setOptionsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, metricsHeartbeat_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.clientId != null) { + oprot.writeFieldBegin(CLIENT_ID_FIELD_DESC); + oprot.writeString(struct.clientId); + oprot.writeFieldEnd(); + } + if (struct.hostname != null) { + oprot.writeFieldBegin(HOSTNAME_FIELD_DESC); + oprot.writeString(struct.hostname); + oprot.writeFieldEnd(); + } + if (struct.options != null) { + oprot.writeFieldBegin(OPTIONS_FIELD_DESC); + struct.options.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class metricsHeartbeat_argsTupleSchemeFactory implements SchemeFactory { + public metricsHeartbeat_argsTupleScheme getScheme() { + return new metricsHeartbeat_argsTupleScheme(); + } + } + + private static class metricsHeartbeat_argsTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, metricsHeartbeat_args struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetClientId()) { + optionals.set(0); + } + if (struct.isSetHostname()) { + optionals.set(1); + } + if (struct.isSetOptions()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); + if (struct.isSetClientId()) { + oprot.writeString(struct.clientId); + } + if (struct.isSetHostname()) { + oprot.writeString(struct.hostname); + } + if (struct.isSetOptions()) { + struct.options.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, metricsHeartbeat_args struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(3); + if (incoming.get(0)) { + struct.clientId = iprot.readString(); + struct.setClientIdIsSet(true); + } + if (incoming.get(1)) { + struct.hostname = iprot.readString(); + struct.setHostnameIsSet(true); + } + if (incoming.get(2)) { + struct.options = new MetricsHeartbeatTOptions(); + struct.options.read(iprot); + struct.setOptionsIsSet(true); + } + } + } + + } + + public static class metricsHeartbeat_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("metricsHeartbeat_result"); + + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0); + private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new metricsHeartbeat_resultStandardSchemeFactory()); + schemes.put(TupleScheme.class, new metricsHeartbeat_resultTupleSchemeFactory()); + } + + private MetricsHeartbeatTResponse success; // required + private alluxio.thrift.AlluxioTException e; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + SUCCESS((short)0, "success"), + E((short)1, "e"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + case 1: // E + return E; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, MetricsHeartbeatTResponse.class))); + tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(metricsHeartbeat_result.class, metaDataMap); + } + + public metricsHeartbeat_result() { + } + + public metricsHeartbeat_result( + MetricsHeartbeatTResponse success, + alluxio.thrift.AlluxioTException e) + { + this(); + this.success = success; + this.e = e; + } + + /** + * Performs a deep copy on other. + */ + public metricsHeartbeat_result(metricsHeartbeat_result other) { + if (other.isSetSuccess()) { + this.success = new MetricsHeartbeatTResponse(other.success); + } + if (other.isSetE()) { + this.e = new alluxio.thrift.AlluxioTException(other.e); + } + } + + public metricsHeartbeat_result deepCopy() { + return new metricsHeartbeat_result(this); + } + + @Override + public void clear() { + this.success = null; + this.e = null; + } + + public MetricsHeartbeatTResponse getSuccess() { + return this.success; + } + + public metricsHeartbeat_result setSuccess(MetricsHeartbeatTResponse success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + public alluxio.thrift.AlluxioTException getE() { + return this.e; + } + + public metricsHeartbeat_result setE(alluxio.thrift.AlluxioTException e) { + this.e = e; + return this; + } + + public void unsetE() { + this.e = null; + } + + /** Returns true if field e is set (has been assigned a value) and false otherwise */ + public boolean isSetE() { + return this.e != null; + } + + public void setEIsSet(boolean value) { + if (!value) { + this.e = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((MetricsHeartbeatTResponse)value); + } + break; + + case E: + if (value == null) { + unsetE(); + } else { + setE((alluxio.thrift.AlluxioTException)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + case E: + return getE(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return isSetSuccess(); + case E: + return isSetE(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof metricsHeartbeat_result) + return this.equals((metricsHeartbeat_result)that); + return false; + } + + public boolean equals(metricsHeartbeat_result that) { + if (that == null) + return false; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + boolean this_present_e = true && this.isSetE(); + boolean that_present_e = true && that.isSetE(); + if (this_present_e || that_present_e) { + if (!(this_present_e && that_present_e)) + return false; + if (!this.e.equals(that.e)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + boolean present_success = true && (isSetSuccess()); + list.add(present_success); + if (present_success) + list.add(success); + + boolean present_e = true && (isSetE()); + list.add(present_e); + if (present_e) + list.add(e); + + return list.hashCode(); + } + + @Override + public int compareTo(metricsHeartbeat_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetE()).compareTo(other.isSetE()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetE()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.e, other.e); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("metricsHeartbeat_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + if (!first) sb.append(", "); + sb.append("e:"); + if (this.e == null) { + sb.append("null"); + } else { + sb.append(this.e); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (success != null) { + success.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class metricsHeartbeat_resultStandardSchemeFactory implements SchemeFactory { + public metricsHeartbeat_resultStandardScheme getScheme() { + return new metricsHeartbeat_resultStandardScheme(); + } + } + + private static class metricsHeartbeat_resultStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, metricsHeartbeat_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 0: // SUCCESS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.success = new MetricsHeartbeatTResponse(); + struct.success.read(iprot); + struct.setSuccessIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 1: // E + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.e = new alluxio.thrift.AlluxioTException(); + struct.e.read(iprot); + struct.setEIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, metricsHeartbeat_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.success != null) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + struct.success.write(oprot); + oprot.writeFieldEnd(); + } + if (struct.e != null) { + oprot.writeFieldBegin(E_FIELD_DESC); + struct.e.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class metricsHeartbeat_resultTupleSchemeFactory implements SchemeFactory { + public metricsHeartbeat_resultTupleScheme getScheme() { + return new metricsHeartbeat_resultTupleScheme(); + } + } + + private static class metricsHeartbeat_resultTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, metricsHeartbeat_result struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetSuccess()) { + optionals.set(0); + } + if (struct.isSetE()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetSuccess()) { + struct.success.write(oprot); + } + if (struct.isSetE()) { + struct.e.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, metricsHeartbeat_result struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.success = new MetricsHeartbeatTResponse(); + struct.success.read(iprot); + struct.setSuccessIsSet(true); + } + if (incoming.get(1)) { + struct.e = new alluxio.thrift.AlluxioTException(); + struct.e.read(iprot); + struct.setEIsSet(true); + } + } + } + + } + +} diff --git a/core/common/src/main/java/alluxio/util/IdUtils.java b/core/common/src/main/java/alluxio/util/IdUtils.java index 56317a5c99c7..e07d08d03f66 100644 --- a/core/common/src/main/java/alluxio/util/IdUtils.java +++ b/core/common/src/main/java/alluxio/util/IdUtils.java @@ -91,4 +91,11 @@ public static long createSessionId() { public static long createMountId() { return getRandomNonNegativeLong(); } + + /** + * @return a positive random long + */ + public static long createFileSystemContextId() { + return Math.abs(sRandom.nextLong()); + } } diff --git a/core/common/src/main/java/alluxio/util/ThreadUtils.java b/core/common/src/main/java/alluxio/util/ThreadUtils.java index 07dd67d23927..8a89bc367495 100644 --- a/core/common/src/main/java/alluxio/util/ThreadUtils.java +++ b/core/common/src/main/java/alluxio/util/ThreadUtils.java @@ -11,13 +11,20 @@ package alluxio.util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.PrintWriter; import java.io.StringWriter; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * Utility method for working with threads. */ public final class ThreadUtils { + private static final Logger LOG = LoggerFactory.getLogger(ThreadUtils.class); + /** * @param thread a thread * @return a human-readable representation of the thread's stack trace @@ -30,5 +37,33 @@ public static String formatStackTrace(Thread thread) { return sw.toString(); } + /** + * From https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html + * + * The following method shuts down an ExecutorService in two phases, first by calling shutdown to + * reject incoming tasks, and then calling shutdownNow, if necessary, to cancel any lingering + * tasks. + * + * @param pool the executor service to shutdown + */ + public static void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(1, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.warn("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + private ThreadUtils() {} // prevent instantiation of utils class } diff --git a/core/common/src/test/java/alluxio/metrics/MetricTest.java b/core/common/src/test/java/alluxio/metrics/MetricTest.java index c8593616a54f..413910ae89b5 100644 --- a/core/common/src/test/java/alluxio/metrics/MetricTest.java +++ b/core/common/src/test/java/alluxio/metrics/MetricTest.java @@ -33,7 +33,7 @@ public void thrift() { public void checkEquality(Metric a, Metric b) { assertEquals(a.getName(), b.getName()); - assertEquals(a.getInstance(), b.getInstance()); + assertEquals(a.getInstanceType(), b.getInstanceType()); assertEquals(a.getValue(), b.getValue()); assertEquals(a.getHostname(), b.getHostname()); assertEquals(a.getFullMetricName(), b.getFullMetricName()); diff --git a/core/common/src/thrift/common.thrift b/core/common/src/thrift/common.thrift index 109d2a5dd80a..e0a0a508c197 100644 --- a/core/common/src/thrift/common.thrift +++ b/core/common/src/thrift/common.thrift @@ -27,8 +27,9 @@ struct BlockLocation { struct Metric { 1: string instance 2: string hostname - 3: string name - 4: i64 value + 3: string instanceId + 4: string name + 5: i64 value } enum CommandType { diff --git a/core/common/src/thrift/metrics_master.thrift b/core/common/src/thrift/metrics_master.thrift new file mode 100644 index 000000000000..95c69f802dfe --- /dev/null +++ b/core/common/src/thrift/metrics_master.thrift @@ -0,0 +1,26 @@ +namespace java alluxio.thrift + +include "common.thrift" +include "exception.thrift" + +struct MetricsHeartbeatTOptions { + 1: list metrics +} + +struct MetricsHeartbeatTResponse {} + +/** + * This interface contains metrics master service endpoints for Alluxio clients. + */ +service MetricsMasterClientService extends common.AlluxioService { + + /** + * Periodic metrics master client heartbeat. + */ + MetricsHeartbeatTResponse metricsHeartbeat( + /** the id of the client */ 1: string clientId, + /** the client hostname */ 2: string hostname, + /** the method options */ 3: MetricsHeartbeatTOptions options, + ) + throws (1: exception.AlluxioTException e) +} diff --git a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java index 6b714e221c01..b3e46e4fe6f0 100644 --- a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java +++ b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java @@ -770,7 +770,7 @@ private void processWorkerMetrics(String hostname, List metrics) { if (metrics.isEmpty()) { return; } - mMetricsMaster.putWorkerMetrics(MetricsSystem.InstanceType.WORKER, hostname, metrics); + mMetricsMaster.workerHeartbeat(hostname, metrics); } /** diff --git a/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java b/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java index 5d82e9bdb0e9..c47be53847d2 100644 --- a/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java +++ b/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java @@ -15,14 +15,15 @@ import alluxio.clock.SystemClock; import alluxio.master.AbstractMaster; import alluxio.master.MasterContext; +import alluxio.metrics.ClientMetrics; import alluxio.metrics.Metric; import alluxio.metrics.MetricsAggregator; import alluxio.metrics.MetricsFilter; import alluxio.metrics.MetricsSystem; -import alluxio.metrics.MetricsSystem.InstanceType; import alluxio.metrics.WorkerMetrics; import alluxio.metrics.aggregator.SumInstancesAggregator; import alluxio.proto.journal.Journal.JournalEntry; +import alluxio.thrift.MetricsMasterClientService; import alluxio.util.executor.ExecutorServiceFactories; import alluxio.util.executor.ExecutorServiceFactory; @@ -89,8 +90,12 @@ public Object getValue() { } private void registerAggregators() { + // worker metrics addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.WORKER, WorkerMetrics.BYTES_READ_ALLUXIO)); + // client metrics + addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.CLIENT, + ClientMetrics.BYTES_READ_LOCAL)); } @Override @@ -116,6 +121,8 @@ public Iterator getJournalEntryIterator() { @Override public Map getServices() { Map services = new HashMap<>(); + services.put(Constants.METRICS_MASTER_CLIENT_SERVICE_NAME, + new MetricsMasterClientService.Processor<>(getMasterServiceHandler())); return services; } @@ -125,7 +132,17 @@ public void start(Boolean isLeader) throws IOException { } @Override - public void putWorkerMetrics(InstanceType instance, String hostname, List metrics) { - mMetricsStore.putWorkerMetrics(instance, hostname, metrics); + public void clientHeartbeat(String clientId, String hostname, List metrics) { + mMetricsStore.putClientMetrics(hostname, clientId, metrics); + } + + @Override + public MetricsMasterClientServiceHandler getMasterServiceHandler() { + return new MetricsMasterClientServiceHandler(this); + } + + @Override + public void workerHeartbeat(String hostname, List metrics) { + mMetricsStore.putWorkerMetrics(hostname, metrics); } } diff --git a/core/server/master/src/main/java/alluxio/master/metrics/MetricsMaster.java b/core/server/master/src/main/java/alluxio/master/metrics/MetricsMaster.java index 49c0775b4569..e7992f51dd62 100644 --- a/core/server/master/src/main/java/alluxio/master/metrics/MetricsMaster.java +++ b/core/server/master/src/main/java/alluxio/master/metrics/MetricsMaster.java @@ -13,7 +13,6 @@ import alluxio.master.Master; import alluxio.metrics.Metric; -import alluxio.metrics.MetricsSystem; import java.util.List; @@ -23,12 +22,25 @@ */ public interface MetricsMaster extends Master { /** - * Put the metrics from an instance with a hostname. If all the old metrics associated with this - * instance will be removed and then replaced by the latest. + * Handles the client's heartbeat request for metrics collection. + * + * @param clientId the client id + * @param hostname the client hostname + * @param metrics client-side metrics + */ + void clientHeartbeat(String clientId, String hostname, List metrics); + + /** + * @return the master service handler + */ + MetricsMasterClientServiceHandler getMasterServiceHandler(); + + /** + * Handles the worker heartbeat and puts the metrics from an instance with a hostname. If all the + * old metrics associated with this instance will be removed and then replaced by the latest. * - * @param instance the instance type * @param hostname the hostname of the instance * @param metrics the new worker metrics */ - void putWorkerMetrics(MetricsSystem.InstanceType instance, String hostname, List metrics); + void workerHeartbeat(String hostname, List metrics); } diff --git a/core/server/master/src/main/java/alluxio/master/metrics/MetricsMasterClientServiceHandler.java b/core/server/master/src/main/java/alluxio/master/metrics/MetricsMasterClientServiceHandler.java new file mode 100644 index 000000000000..2c040eb392fc --- /dev/null +++ b/core/server/master/src/main/java/alluxio/master/metrics/MetricsMasterClientServiceHandler.java @@ -0,0 +1,80 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.metrics; + +import alluxio.Constants; +import alluxio.RpcUtils; +import alluxio.RpcUtils.RpcCallableThrowsIOException; +import alluxio.exception.AlluxioException; +import alluxio.metrics.Metric; +import alluxio.thrift.AlluxioTException; +import alluxio.thrift.GetServiceVersionTOptions; +import alluxio.thrift.GetServiceVersionTResponse; +import alluxio.thrift.MetricsHeartbeatTOptions; +import alluxio.thrift.MetricsHeartbeatTResponse; +import alluxio.thrift.MetricsMasterClientService; + +import com.google.common.base.Preconditions; +import jersey.repackaged.com.google.common.collect.Lists; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import javax.annotation.concurrent.NotThreadSafe; + +/** + * This class is a Thrift handler for metrics master RPCs invoked by an Alluxio client. + */ +@NotThreadSafe +public final class MetricsMasterClientServiceHandler implements MetricsMasterClientService.Iface { + private static final Logger LOG = + LoggerFactory.getLogger(MetricsMasterClientServiceHandler.class); + + private final MetricsMaster mMetricsMaster; + + MetricsMasterClientServiceHandler(MetricsMaster metricsMaster) { + Preconditions.checkNotNull(metricsMaster, "metricsMaster"); + mMetricsMaster = metricsMaster; + } + + @Override + public GetServiceVersionTResponse getServiceVersion(GetServiceVersionTOptions options) + throws AlluxioTException, TException { + return new GetServiceVersionTResponse(Constants.METRICS_MASTER_CLIENT_SERVICE_VERSION); + } + + @Override + public MetricsHeartbeatTResponse metricsHeartbeat(final String clientId, final String hostname, + final MetricsHeartbeatTOptions options) throws AlluxioTException, TException { + return RpcUtils.call(LOG, new RpcCallableThrowsIOException() { + @Override + public MetricsHeartbeatTResponse call() throws AlluxioException, IOException { + List metrics = Lists.newArrayList(); + for (alluxio.thrift.Metric metric : options.getMetrics()) { + Metric parsed = Metric.from(metric); + metrics.add(parsed); + } + mMetricsMaster.clientHeartbeat(clientId, hostname, metrics); + return new MetricsHeartbeatTResponse(); + } + + @Override + public String toString() { + return String.format("clientHeartbeat: hostname=%s, contextId=%s, " + "options=%s", + hostname, clientId, options); + } + }); + } +} diff --git a/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java b/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java index 867ca7899be3..ed9d0d60040b 100644 --- a/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java +++ b/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java @@ -17,6 +17,9 @@ import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem.InstanceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Set; @@ -27,6 +30,7 @@ */ @ThreadSafe public class MetricsStore { + private static final Logger LOG = LoggerFactory.getLogger(MetricsStore.class); private static final IndexDefinition FULL_NAME_INDEX = new IndexDefinition(true) { @Override public Object getFieldValue(Metric o) { @@ -41,15 +45,33 @@ public Object getFieldValue(Metric o) { } }; - private static final IndexDefinition HOSTNAME_INDEX = new IndexDefinition(false) { - @Override - public Object getFieldValue(Metric o) { - return o.getHostname(); - } - }; + private static final IndexDefinition ID_INDEX = + new IndexDefinition(false) { + @Override + public Object getFieldValue(Metric o) { + return getFullInstanceId(o.getHostname(), o.getInstanceId()); + } + }; + + /** + * Gets the full instance id of the concatenation of hostname and the id. The dots in the hostname + * replaced by underscores. + * + * @param hostname the hostname + * @param id the instance id + * @return the full instance id of hostname[:id] + */ + private static String getFullInstanceId(String hostname, String id) { + String str = hostname == null ? "" : hostname; + str = str.replace('.', '_'); + str += id == null ? "" : ":" + id; + return str; + } private final IndexedSet mWorkerMetrics = - new IndexedSet<>(FULL_NAME_INDEX, NAME_INDEX, HOSTNAME_INDEX); + new IndexedSet<>(FULL_NAME_INDEX, NAME_INDEX, ID_INDEX); + private final IndexedSet mClientMetrics = + new IndexedSet<>(FULL_NAME_INDEX, NAME_INDEX, ID_INDEX); /** * Gets all the metrics by instance type. The supported instance types are worker and client. @@ -60,28 +82,52 @@ public Object getFieldValue(Metric o) { private IndexedSet getMetricsByInstanceType(MetricsSystem.InstanceType instanceType) { if (instanceType == InstanceType.WORKER) { return mWorkerMetrics; + } else if (instanceType == InstanceType.CLIENT) { + return mClientMetrics; } else { throw new IllegalArgumentException("Unsupported instance type " + instanceType); } } /** - * Put the metrics from an instance with a hostname. If all the old metrics associated with this + * Put the metrics from a worker with a hostname. If all the old metrics associated with this * instance will be removed and then replaced by the latest. * - * @param instance the instance type * @param hostname the hostname of the instance * @param metrics the new worker metrics */ - public synchronized void putWorkerMetrics(MetricsSystem.InstanceType instance, String hostname, + public synchronized void putWorkerMetrics(String hostname, List metrics) { + if (metrics.isEmpty()) { + return; + } + mWorkerMetrics.removeByField(ID_INDEX, hostname); + for (Metric metric : metrics) { + if (metric.getHostname() == null) { + continue; // ignore metrics whose hostname is null + } + mWorkerMetrics.add(metric); + } + } + + /** + * Put the metrics from a client with a hostname and a client id. If all the old metrics + * associated with this instance will be removed and then replaced by the latest. + * + * @param hostname the hostname of the client + * @param clientId the id of the client + * @param metrics the new metrics + */ + public synchronized void putClientMetrics(String hostname, String clientId, List metrics) { - IndexedSet set = getMetricsByInstanceType(instance); - set.removeByField(HOSTNAME_INDEX, hostname); + if (metrics.isEmpty()) { + return; + } + mClientMetrics.removeByField(ID_INDEX, getFullInstanceId(hostname, clientId)); for (Metric metric : metrics) { if (metric.getHostname() == null) { - continue; // ignore metrics who hostname is null + continue; // ignore metrics whose hostname is null } - set.add(metric); + mClientMetrics.add(metric); } } @@ -103,5 +149,6 @@ public synchronized Set getMetricsByInstanceTypeAndName( */ public synchronized void clear() { mWorkerMetrics.clear(); + mClientMetrics.clear(); } } diff --git a/core/server/master/src/test/java/alluxio/master/metrics/MetricsMasterTest.java b/core/server/master/src/test/java/alluxio/master/metrics/MetricsMasterTest.java index 6c285202705d..d6d824d8fd49 100644 --- a/core/server/master/src/test/java/alluxio/master/metrics/MetricsMasterTest.java +++ b/core/server/master/src/test/java/alluxio/master/metrics/MetricsMasterTest.java @@ -71,22 +71,41 @@ public void after() throws Exception { @Test public void testAggregator() { mMetricsMaster - .addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.WORKER, "metric1")); + .addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.WORKER, "metricA")); mMetricsMaster - .addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.WORKER, "metric2")); - List metrics1 = Lists.newArrayList(Metric.from("worker.192_1_1_1.metric1", 10), - Metric.from("worker.192_1_1_1.metric2", 20)); - mMetricsMaster.putWorkerMetrics(MetricsSystem.InstanceType.WORKER, "192_1_1_1", metrics1); - List metrics2 = Lists.newArrayList(Metric.from("worker.192_1_1_2.metric1", 1), - Metric.from("worker.192_1_1_2.metric2", 2)); - mMetricsMaster.putWorkerMetrics(MetricsSystem.InstanceType.WORKER, "192_1_1_2", metrics2); - assertEquals(11L, getGauge("metric1")); - assertEquals(22L, getGauge("metric2")); + .addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.WORKER, "metricB")); + List metrics1 = Lists.newArrayList(Metric.from("worker.192_1_1_1.metricA", 10), + Metric.from("worker.192_1_1_1.metricB", 20)); + mMetricsMaster.workerHeartbeat("192_1_1_1", metrics1); + List metrics2 = Lists.newArrayList(Metric.from("worker.192_1_1_2.metricA", 1), + Metric.from("worker.192_1_1_2.metricB", 2)); + mMetricsMaster.workerHeartbeat("192_1_1_2", metrics2); + assertEquals(11L, getGauge("metricA")); + assertEquals(22L, getGauge("metricB")); // override metrics from hostname 192_1_1_2 - List metrics3 = Lists.newArrayList(Metric.from("worker.192_1_1_2.metric1", 3)); - mMetricsMaster.putWorkerMetrics(MetricsSystem.InstanceType.WORKER, "192_1_1_2", metrics3); - assertEquals(13L, getGauge("metric1")); - assertEquals(20L, getGauge("metric2")); + List metrics3 = Lists.newArrayList(Metric.from("worker.192_1_1_2.metricA", 3)); + mMetricsMaster.workerHeartbeat("192_1_1_2", metrics3); + assertEquals(13L, getGauge("metricA")); + assertEquals(20L, getGauge("metricB")); + } + + @Test + public void testClientHeartbeat() { + mMetricsMaster + .addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.CLIENT, "metric1")); + mMetricsMaster + .addAggregator(new SumInstancesAggregator(MetricsSystem.InstanceType.CLIENT, "metric2")); + List metrics1 = Lists.newArrayList(Metric.from("client.192_1_1_1:A.metric1", 10), + Metric.from("client.192_1_1_1:A.metric2", 20)); + mMetricsMaster.clientHeartbeat("A", "192.1.1.1", metrics1); + List metrics2 = Lists.newArrayList(Metric.from("client.192_1_1_1:B.metric1", 15), + Metric.from("client.192_1_1_1:B.metric2", 25)); + mMetricsMaster.clientHeartbeat("B", "192.1.1.1", metrics2); + List metrics3 = Lists.newArrayList(Metric.from("client.192_1_1_2:C.metric1", 1), + Metric.from("client.192_1_1_2:C.metric2", 2)); + mMetricsMaster.clientHeartbeat("C", "192.1.1.2", metrics3); + assertEquals(26L, getGauge("metric1")); + assertEquals(47L, getGauge("metric2")); } private Object getGauge(String name) { diff --git a/core/server/master/src/test/java/alluxio/master/metrics/MetricsStoreTest.java b/core/server/master/src/test/java/alluxio/master/metrics/MetricsStoreTest.java index 9997cf42e95c..67294c093c08 100644 --- a/core/server/master/src/test/java/alluxio/master/metrics/MetricsStoreTest.java +++ b/core/server/master/src/test/java/alluxio/master/metrics/MetricsStoreTest.java @@ -13,7 +13,6 @@ import static org.junit.Assert.assertEquals; -import alluxio.master.metrics.MetricsStore; import alluxio.metrics.Metric; import alluxio.metrics.MetricsSystem; @@ -36,12 +35,29 @@ public void before() { public void putWorkerMetrics() { List metrics1 = Lists.newArrayList(Metric.from("worker.192_1_1_1.metric1", 10), Metric.from("worker.192_1_1_1.metric2", 20)); - mMetricStore.putWorkerMetrics(MetricsSystem.InstanceType.WORKER, "192_1_1_1", metrics1); + mMetricStore.putWorkerMetrics("192_1_1_1", metrics1); List metrics2 = Lists.newArrayList(Metric.from("worker.192_1_1_2.metric1", 1)); - mMetricStore.putWorkerMetrics(MetricsSystem.InstanceType.WORKER, "192_1_1_2", metrics2); + mMetricStore.putWorkerMetrics("192_1_1_2", metrics2); assertEquals( Sets.newHashSet(Metric.from("worker.192_1_1_1.metric1", 10), Metric.from("worker.192_1_1_2.metric1", 1)), mMetricStore.getMetricsByInstanceTypeAndName(MetricsSystem.InstanceType.WORKER, "metric1")); } + + @Test + public void putClientMetrics() { + List metrics1 = Lists.newArrayList(Metric.from("client.192_1_1_1:A.metric1", 10), + Metric.from("client.192_1_1_1:A.metric2", 20)); + mMetricStore.putClientMetrics("192_1_1_1", "A", metrics1); + List metrics2 = Lists.newArrayList(Metric.from("client.192_1_1_2:C.metric1", 1)); + mMetricStore.putClientMetrics("192_1_1_2", "C", metrics2); + List metrics3 = Lists.newArrayList(Metric.from("client.192_1_1_1:B.metric1", 15), + Metric.from("client.192_1_1_1:B.metric2", 25)); + mMetricStore.putClientMetrics("192_1_1_1", "B", metrics3); + assertEquals( + Sets.newHashSet(Metric.from("client.192_1_1_1:A.metric1", 10), + Metric.from("client.192_1_1_2:C.metric1", 1), + Metric.from("client.192_1_1_1:B.metric1", 15)), + mMetricStore.getMetricsByInstanceTypeAndName(MetricsSystem.InstanceType.CLIENT, "metric1")); + } } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java b/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java index 1949a849e3d7..8ea81fb38824 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java @@ -21,7 +21,6 @@ import alluxio.exception.InvalidWorkerStateException; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.metrics.Metric; -import alluxio.metrics.Metric; import alluxio.metrics.MetricsSystem; import alluxio.thrift.Command; import alluxio.util.ThreadFactoryUtils; diff --git a/core/server/worker/src/main/java/alluxio/worker/block/RemoteBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/block/RemoteBlockReader.java index 05796f98f1ed..c8b169e7c530 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/RemoteBlockReader.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/RemoteBlockReader.java @@ -80,7 +80,7 @@ public int transferTo(ByteBuf buf) throws IOException { if (mInputStream == null || mInputStream.remaining() <= 0) { return -1; } - int bytesToRead = (int) Math.min((long) buf.writableBytes(), mInputStream.remaining()); + int bytesToRead = (int) Math.min(buf.writableBytes(), mInputStream.remaining()); return buf.writeBytes(mInputStream, bytesToRead); } @@ -107,7 +107,7 @@ private void init() { } WorkerNetAddress address = new WorkerNetAddress().setHost(mDataSource.getHostName()) .setDataPort(mDataSource.getPort()); - mInputStream = BlockInStream.createRemoteBlockInStream(FileSystemContext.INSTANCE, mBlockId, + mInputStream = BlockInStream.createRemoteBlockInStream(FileSystemContext.get(), mBlockId, address, BlockInStream.BlockInStreamSource.REMOTE, mBlockSize, mUfsOptions); mChannel = Channels.newChannel(mInputStream); } diff --git a/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java b/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java index 07cfa51aba41..0d5d25dfb4c5 100644 --- a/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java +++ b/minicluster/src/main/java/alluxio/master/AbstractLocalAlluxioCluster.java @@ -80,7 +80,7 @@ public void start() throws Exception { setupTest(); startMasters(); // Reset the file system context to make sure the correct master RPC port is used. - FileSystemContext.INSTANCE.reset(); + FileSystemContext.get().reset(); startWorkers(); startProxy(); @@ -196,9 +196,9 @@ protected void setupTest() throws IOException { */ public void stop() throws Exception { stopFS(); - ConfigurationTestUtils.resetConfiguration(); reset(); LoginUserTestUtils.resetLoginUser(); + ConfigurationTestUtils.resetConfiguration(); } /** @@ -308,7 +308,7 @@ protected void reset() { * Resets the client pools to the original state. */ protected void resetClientPools() throws IOException { - FileSystemContext.INSTANCE.reset(); + FileSystemContext.get().reset(); } /** diff --git a/shell/src/main/java/alluxio/cli/fs/command/LeaderCommand.java b/shell/src/main/java/alluxio/cli/fs/command/LeaderCommand.java index 480d34f453a3..184211e6cd8c 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/LeaderCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/LeaderCommand.java @@ -56,7 +56,7 @@ public void validateArgs(CommandLine cl) throws InvalidArgumentException { @Override public int run(CommandLine cl) { try (CloseableResource client = - FileSystemContext.INSTANCE.acquireMasterClientResource()) { + FileSystemContext.get().acquireMasterClientResource()) { try { InetSocketAddress address = client.get().getAddress(); System.out.println(address.getHostName()); diff --git a/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java b/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java index dfd966d0a068..48306c38bae3 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java @@ -99,7 +99,7 @@ private void load(AlluxioURI filePath, boolean local) throws AlluxioException, I } else { OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.CACHE_PROMOTE); if (local) { - if (!FileSystemContext.INSTANCE.hasLocalWorker()) { + if (!FileSystemContext.get().hasLocalWorker()) { System.out.println("When local option is specified," + " there must be a local worker available"); return; diff --git a/shell/src/main/java/alluxio/cli/fs/command/ReportCommand.java b/shell/src/main/java/alluxio/cli/fs/command/ReportCommand.java index 763e796d58f3..b21b44290bd1 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/ReportCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/ReportCommand.java @@ -47,7 +47,7 @@ public String getCommandName() { @Override protected void runPlainPath(AlluxioURI path, CommandLine cl) throws AlluxioException, IOException { - LineageFileSystem.get(FileSystemContext.INSTANCE, LineageContext.INSTANCE).reportLostFile(path); + LineageFileSystem.get(FileSystemContext.get(), LineageContext.INSTANCE).reportLostFile(path); System.out.println(path + " has been reported as lost."); } diff --git a/shell/src/main/java/alluxio/cli/fsadmin/command/ReportCommand.java b/shell/src/main/java/alluxio/cli/fsadmin/command/ReportCommand.java index 805ca03a6b03..ec56cdeb7eb1 100644 --- a/shell/src/main/java/alluxio/cli/fsadmin/command/ReportCommand.java +++ b/shell/src/main/java/alluxio/cli/fsadmin/command/ReportCommand.java @@ -162,7 +162,7 @@ public int run(CommandLine cl) throws IOException { // Check if Alluxio master and client services are running try (CloseableResource client = - FileSystemContext.INSTANCE.acquireMasterClientResource()) { + FileSystemContext.get().acquireMasterClientResource()) { MasterInquireClient inquireClient = null; try { InetSocketAddress address = client.get().getAddress(); diff --git a/tests/src/test/java/alluxio/client/fs/FileSystemMasterIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/FileSystemMasterIntegrationTest.java index bf90b063e718..18a6819af215 100644 --- a/tests/src/test/java/alluxio/client/fs/FileSystemMasterIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/FileSystemMasterIntegrationTest.java @@ -15,7 +15,6 @@ import alluxio.AlluxioURI; import alluxio.AuthenticatedUserRule; -import alluxio.master.file.FileSystemMaster; import alluxio.Configuration; import alluxio.Constants; import alluxio.PropertyKey; @@ -38,6 +37,7 @@ import alluxio.master.MasterClientConfig; import alluxio.master.MasterRegistry; import alluxio.master.block.BlockMaster; +import alluxio.master.file.FileSystemMaster; import alluxio.master.file.meta.TtlIntervalRule; import alluxio.master.file.options.CompleteFileOptions; import alluxio.master.file.options.CreateDirectoryOptions; @@ -131,6 +131,7 @@ public class FileSystemMasterIntegrationTest extends BaseIntegrationTest { @Rule public LocalAlluxioClusterResource mLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder() + .setProperty(PropertyKey.USER_METRICS_COLLECTION_ENABLED, false) .setProperty(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS, String.valueOf(TTL_CHECKER_INTERVAL_MS)) .setProperty(PropertyKey.WORKER_MEMORY_SIZE, 1000) diff --git a/tests/src/test/java/alluxio/client/fs/ImpersonationIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/ImpersonationIntegrationTest.java index b576bdac5c64..0dda807f400f 100644 --- a/tests/src/test/java/alluxio/client/fs/ImpersonationIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/ImpersonationIntegrationTest.java @@ -72,7 +72,7 @@ public final class ImpersonationIntegrationTest extends BaseIntegrationTest { CustomGroupMapping.class.getName()).build(); @After - public void after() { + public void after() throws Exception { ConfigurationTestUtils.resetConfiguration(); } @@ -90,13 +90,6 @@ public static void beforeClass() { GROUPS.put(HDFS_USER, HDFS_GROUP1 + "," + HDFS_GROUP2); } - @Test - @LocalAlluxioClusterResource.Config(confParams = {IMPERSONATION_GROUPS_CONFIG, "*"}) - public void impersonationUsed() throws Exception { - Configuration.set(PropertyKey.SECURITY_LOGIN_IMPERSONATION_USERNAME, IMPERSONATION_USER); - checkCreateFile(null, IMPERSONATION_USER); - } - @Test @LocalAlluxioClusterResource.Config(confParams = {IMPERSONATION_GROUPS_CONFIG, "*"}) public void impersonationNotUsed() throws Exception { @@ -119,6 +112,13 @@ public void impersonationUsedHdfsUser() throws Exception { checkCreateFile(createHdfsSubject(), HDFS_USER); } + @Test + @LocalAlluxioClusterResource.Config(confParams = {IMPERSONATION_GROUPS_CONFIG, "*"}) + public void impersonationUsed() throws Exception { + Configuration.set(PropertyKey.SECURITY_LOGIN_IMPERSONATION_USERNAME, IMPERSONATION_USER); + checkCreateFile(null, IMPERSONATION_USER); + } + @Test @LocalAlluxioClusterResource.Config( confParams = {IMPERSONATION_GROUPS_CONFIG, IMPERSONATION_GROUP1}) diff --git a/tests/src/test/java/alluxio/client/fs/NettyChannelCloseIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/NettyChannelCloseIntegrationTest.java index 58190510eec1..9f53a7aade6b 100644 --- a/tests/src/test/java/alluxio/client/fs/NettyChannelCloseIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/NettyChannelCloseIntegrationTest.java @@ -37,26 +37,26 @@ public void before() throws Exception { @Test public void closeAsync() throws Exception { for (int i = 0; i < 1000; i++) { - Channel channel = FileSystemContext.INSTANCE.acquireNettyChannel(mWorkerNetAddress); + Channel channel = FileSystemContext.get().acquireNettyChannel(mWorkerNetAddress); Assert.assertTrue(channel.isOpen()); // Note: If you replace closeChannel with channel.close(), this test fails with high // probability. CommonUtils.closeChannel(channel); Assert.assertTrue(!channel.isOpen()); - FileSystemContext.INSTANCE.releaseNettyChannel(mWorkerNetAddress, channel); + FileSystemContext.get().releaseNettyChannel(mWorkerNetAddress, channel); } } @Test public void closeSync() throws Exception { for (int i = 0; i < 1000; i++) { - Channel channel = FileSystemContext.INSTANCE.acquireNettyChannel(mWorkerNetAddress); + Channel channel = FileSystemContext.get().acquireNettyChannel(mWorkerNetAddress); Assert.assertTrue(channel.isOpen()); // Note: If you replace closeChannel with channel.close(), this test fails with high // probability. CommonUtils.closeChannelSync(channel); Assert.assertTrue(!channel.isOpen()); - FileSystemContext.INSTANCE.releaseNettyChannel(mWorkerNetAddress, channel); + FileSystemContext.get().releaseNettyChannel(mWorkerNetAddress, channel); } } } diff --git a/tests/src/test/java/alluxio/client/fs/RemoteReadIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/RemoteReadIntegrationTest.java index e9387a6db0fe..a08ba214a204 100644 --- a/tests/src/test/java/alluxio/client/fs/RemoteReadIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/RemoteReadIntegrationTest.java @@ -271,7 +271,7 @@ public void readTest4() throws Exception { BlockInfo info = blockStore.getInfo(blockId); WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress(); BlockInStream is = - BlockInStream.create(FileSystemContext.INSTANCE, options.getBlockInfo(blockId), + BlockInStream.create(FileSystemContext.get(), options.getBlockInfo(blockId), workerAddr, BlockInStreamSource.REMOTE, options); byte[] ret = new byte[k]; int value = is.read(); @@ -305,7 +305,7 @@ public void readTest5() throws Exception { BlockInfo info = AlluxioBlockStore.create().getInfo(blockId); WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress(); BlockInStream is = - BlockInStream.create(FileSystemContext.INSTANCE, options.getBlockInfo(blockId), + BlockInStream.create(FileSystemContext.get(), options.getBlockInfo(blockId), workerAddr, BlockInStreamSource.REMOTE, options); byte[] ret = new byte[k]; int read = is.read(ret); @@ -333,7 +333,7 @@ public void readTest6() throws Exception { BlockInfo info = AlluxioBlockStore.create().getInfo(blockId); WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress(); BlockInStream is = - BlockInStream.create(FileSystemContext.INSTANCE, options.getBlockInfo(blockId), + BlockInStream.create(FileSystemContext.get(), options.getBlockInfo(blockId), workerAddr, BlockInStreamSource.REMOTE, options); byte[] ret = new byte[k / 2]; int read = 0; @@ -577,7 +577,7 @@ public void remoteReadLock() throws Exception { WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress(); BlockInStream is = - BlockInStream.create(FileSystemContext.INSTANCE, options.getBlockInfo(blockId), + BlockInStream.create(FileSystemContext.get(), options.getBlockInfo(blockId), workerAddr, BlockInStreamSource.REMOTE, options); Assert.assertEquals(0, is.read()); mFileSystem.delete(uri); @@ -594,7 +594,7 @@ public void remoteReadLock() throws Exception { // Try to create an in stream again, and it should fail. BlockInStream is2 = null; try { - is2 = BlockInStream.create(FileSystemContext.INSTANCE, options.getBlockInfo(blockId), + is2 = BlockInStream.create(FileSystemContext.get(), options.getBlockInfo(blockId), workerAddr, BlockInStreamSource.REMOTE, options); } catch (NotFoundException e) { // Expected since the file has been deleted. diff --git a/tests/src/test/java/alluxio/client/hadoop/HdfsFileInputStreamIntegrationTest.java b/tests/src/test/java/alluxio/client/hadoop/HdfsFileInputStreamIntegrationTest.java index 7af76761bf03..80944e5d8489 100644 --- a/tests/src/test/java/alluxio/client/hadoop/HdfsFileInputStreamIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/hadoop/HdfsFileInputStreamIntegrationTest.java @@ -12,7 +12,6 @@ package alluxio.client.hadoop; import alluxio.AlluxioURI; -import alluxio.hadoop.HadoopClientTestUtils; import alluxio.PropertyKey; import alluxio.client.ReadType; import alluxio.client.WriteType; @@ -22,6 +21,7 @@ import alluxio.client.file.URIStatus; import alluxio.exception.AlluxioException; import alluxio.exception.PreconditionMessage; +import alluxio.hadoop.HadoopClientTestUtils; import alluxio.hadoop.HdfsFileInputStream; import alluxio.testutils.BaseIntegrationTest; import alluxio.testutils.LocalAlluxioClusterResource; @@ -75,7 +75,7 @@ public final void before() throws Exception { mFileSystem = sLocalAlluxioClusterResource.get().getClient(); FileSystemTestUtils .createByteFile(mFileSystem, IN_MEMORY_FILE, WriteType.CACHE_THROUGH, FILE_LEN); - mInMemInputStream = new HdfsFileInputStream(FileSystemContext.INSTANCE, + mInMemInputStream = new HdfsFileInputStream(FileSystemContext.get(), new AlluxioURI(IN_MEMORY_FILE), null); } @@ -83,7 +83,7 @@ private void createUfsInStream(ReadType readType) throws Exception { String defaultReadType = alluxio.Configuration.get(PropertyKey.USER_FILE_READ_TYPE_DEFAULT); alluxio.Configuration.set(PropertyKey.USER_FILE_READ_TYPE_DEFAULT, readType.name()); FileSystemTestUtils.createByteFile(mFileSystem, UFS_ONLY_FILE, WriteType.THROUGH, FILE_LEN); - mUfsInputStream = new HdfsFileInputStream(FileSystemContext.INSTANCE, + mUfsInputStream = new HdfsFileInputStream(FileSystemContext.get(), new AlluxioURI(UFS_ONLY_FILE), null); alluxio.Configuration.set(PropertyKey.USER_FILE_READ_TYPE_DEFAULT, defaultReadType); } diff --git a/tests/src/test/java/alluxio/client/rest/LineageMasterClientRestApiTest.java b/tests/src/test/java/alluxio/client/rest/LineageMasterClientRestApiTest.java index e4f3e3211405..d0d92eaab602 100644 --- a/tests/src/test/java/alluxio/client/rest/LineageMasterClientRestApiTest.java +++ b/tests/src/test/java/alluxio/client/rest/LineageMasterClientRestApiTest.java @@ -54,7 +54,7 @@ public void before() throws Exception { mHostname = mResource.get().getHostname(); mPort = mResource.get().getLocalAlluxioMaster().getMasterProcess().getWebAddress().getPort(); mServicePrefix = LineageMasterClientRestServiceHandler.SERVICE_PREFIX; - mLineageClient = LineageFileSystem.get(FileSystemContext.INSTANCE, LineageContext.INSTANCE); + mLineageClient = LineageFileSystem.get(FileSystemContext.get(), LineageContext.INSTANCE); mMasterProcess = mResource.get().getLocalAlluxioMaster().getMasterProcess(); } diff --git a/tests/src/test/java/alluxio/server/ft/journal/JournalShutdownIntegrationTest.java b/tests/src/test/java/alluxio/server/ft/journal/JournalShutdownIntegrationTest.java index 75432e7cb992..c0290c2b9dae 100644 --- a/tests/src/test/java/alluxio/server/ft/journal/JournalShutdownIntegrationTest.java +++ b/tests/src/test/java/alluxio/server/ft/journal/JournalShutdownIntegrationTest.java @@ -12,16 +12,13 @@ package alluxio.server.ft.journal; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import alluxio.AlluxioURI; import alluxio.AuthenticatedUserRule; -import alluxio.master.LocalAlluxioCluster; -import alluxio.master.MasterRegistry; -import alluxio.master.MultiMasterLocalAlluxioCluster; import alluxio.Configuration; import alluxio.ConfigurationRule; import alluxio.ConfigurationTestUtils; @@ -31,6 +28,9 @@ import alluxio.client.WriteType; import alluxio.client.file.FileSystem; import alluxio.client.file.FileSystemContext; +import alluxio.master.LocalAlluxioCluster; +import alluxio.master.MasterRegistry; +import alluxio.master.MultiMasterLocalAlluxioCluster; import alluxio.multi.process.MultiProcessCluster; import alluxio.multi.process.MultiProcessCluster.DeployMode; import alluxio.testutils.BaseIntegrationTest; @@ -67,6 +67,7 @@ public class JournalShutdownIntegrationTest extends BaseIntegrationTest { public static SystemPropertyRule sDisableHdfsCacheRule = new SystemPropertyRule("fs.hdfs.impl.disable.cache", "true"); + @Override protected List rules() { return Arrays.asList( new AuthenticatedUserRule("test"), @@ -97,7 +98,7 @@ public final void before() throws Exception { public final void after() throws Exception { mExecutorsForClient.shutdown(); ConfigurationTestUtils.resetConfiguration(); - FileSystemContext.INSTANCE.reset(); + FileSystemContext.get().reset(); } @Test