Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Revert "Make RCA framework NOT use ClusterDetailsEventProcessor (#274)"
Browse files Browse the repository at this point in the history
This reverts commit 95c19b6.
  • Loading branch information
Sid Narayan committed Jul 24, 2020
1 parent 7697de3 commit 59c461e
Show file tree
Hide file tree
Showing 51 changed files with 388 additions and 826 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class PerformanceAnalyzerApp {
public static final BlockingQueue<PAThreadException> exceptionQueue =
new ArrayBlockingQueue<>(EXCEPTION_QUEUE_LENGTH);

public static void main(String[] args) {
public static void main(String[] args) throws Exception {
PluginSettings settings = PluginSettings.instance();
StatsCollector.STATS_TYPE = "agent-stats-metadata";
METRIC_COLLECTOR_EXECUTOR.addScheduledMetricCollector(StatsCollector.instance());
Expand All @@ -112,19 +112,16 @@ public static void main(String[] args) {

final GRPCConnectionManager connectionManager = new GRPCConnectionManager(
settings.getHttpsEnabled());
AppContext appContext = new AppContext();
final ClientServers clientServers = createClientServers(connectionManager, appContext);
final ClientServers clientServers = createClientServers(connectionManager);
startErrorHandlingThread();

startReaderThread(appContext);
startReaderThread();
startGrpcServerThread(clientServers.getNetServer());
startWebServerThread(clientServers.getHttpServer());
startRcaTopLevelThread(clientServers, connectionManager, appContext);
startRcaTopLevelThread(clientServers, connectionManager);
}

private static void startRcaTopLevelThread(final ClientServers clientServers,
final GRPCConnectionManager connectionManager,
final AppContext appContext) {
final GRPCConnectionManager connectionManager) {
rcaController =
new RcaController(
THREAD_PROVIDER,
Expand All @@ -133,8 +130,7 @@ private static void startRcaTopLevelThread(final ClientServers clientServers,
clientServers,
Util.DATA_DIR,
RcaConsts.RCA_STATE_CHECK_INTERVAL_IN_MS,
RcaConsts.nodeRolePollerPeriodicityInSeconds * 1000,
appContext
RcaConsts.nodeRolePollerPeriodicityInSeconds * 1000
);

Thread rcaControllerThread = THREAD_PROVIDER.createThreadForRunnable(() -> rcaController.run(),
Expand Down Expand Up @@ -192,13 +188,13 @@ private static void startGrpcServerThread(final NetServer server) {
grpcServerThread.start();
}

private static void startReaderThread(final AppContext appContext) {
private static void startReaderThread() {
PluginSettings settings = PluginSettings.instance();
final Thread readerThread = THREAD_PROVIDER.createThreadForRunnable(() -> {
while (true) {
try {
ReaderMetricsProcessor mp =
new ReaderMetricsProcessor(settings.getMetricsLocation(), true, appContext);
new ReaderMetricsProcessor(settings.getMetricsLocation(), true);
ReaderMetricsProcessor.setCurrentInstance(mp);
mp.run();
} catch (Throwable e) {
Expand All @@ -225,8 +221,7 @@ private static void startReaderThread(final AppContext appContext) {
*
* @return gRPC client and the gRPC server and the httpServer wrapped in a class.
*/
public static ClientServers createClientServers(final GRPCConnectionManager connectionManager,
final AppContext appContext) {
public static ClientServers createClientServers(final GRPCConnectionManager connectionManager) {
boolean useHttps = PluginSettings.instance().getHttpsEnabled();

NetServer netServer = new NetServer(Util.RPC_PORT, 1, useHttps);
Expand All @@ -236,9 +231,7 @@ public static ClientServers createClientServers(final GRPCConnectionManager conn
netServer.setMetricsHandler(new MetricsServerHandler());
HttpServer httpServer =
PerformanceAnalyzerWebServer.createInternalServer(PluginSettings.instance());
httpServer.createContext(
QUERY_URL,
new QueryMetricsRequestHandler(netClient, metricsRestUtil, appContext));
httpServer.createContext(QUERY_URL, new QueryMetricsRequestHandler(netClient, metricsRestUtil));

return new ClientServers(httpServer, netServer, netClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RCA_MUTE_ERROR_METRIC;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerThreads;
Expand All @@ -36,7 +35,6 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Stats;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ThresholdMain;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaRuntimeMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.net.NodeStateManager;
Expand All @@ -50,9 +48,10 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.persistence.PersistenceFactory;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RCAScheduler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RcaSchedulerState;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor.NodeDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rest.QueryRcaRequestHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider;
import com.google.common.annotations.VisibleForTesting;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -67,6 +66,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -120,19 +120,15 @@ public class RcaController {
private AtomicReference<ExecutorService> networkThreadPoolReference = new AtomicReference<>();
private ReceivedFlowUnitStore receivedFlowUnitStore;

private final AppContext appContext;

public RcaController(
final ThreadProvider threadProvider,
final ScheduledExecutorService netOpsExecutorService,
final GRPCConnectionManager grpcConnectionManager,
final ClientServers clientServers,
final String rca_enabled_conf_location,
final long rcaStateCheckIntervalMillis,
final long nodeRoleCheckPeriodicityMillis,
final AppContext appContext) {
final long nodeRoleCheckPeriodicityMillis) {
this.threadProvider = threadProvider;
this.appContext = appContext;
this.netOpsExecutorService = netOpsExecutorService;
this.rcaNetClient = clientServers.getNetClient();
this.rcaNetServer = clientServers.getNetServer();
Expand All @@ -141,8 +137,8 @@ public RcaController(
netPersistor = new NetPersistor();
this.useHttps = PluginSettings.instance().getHttpsEnabled();
subscriptionManager = new SubscriptionManager(grpcConnectionManager);
nodeStateManager = new NodeStateManager(this.appContext);
queryRcaRequestHandler = new QueryRcaRequestHandler(this.appContext);
nodeStateManager = new NodeStateManager();
queryRcaRequestHandler = new QueryRcaRequestHandler();
this.rcaScheduler = null;
this.rcaStateCheckIntervalMillis = rcaStateCheckIntervalMillis;
this.roleCheckPeriodicity = nodeRoleCheckPeriodicityMillis;
Expand All @@ -166,21 +162,16 @@ private void start() {
receivedFlowUnitStore = new ReceivedFlowUnitStore(rcaConf.getPerVertexBufferLength());
WireHopper net =
new WireHopper(nodeStateManager, rcaNetClient, subscriptionManager,
networkThreadPoolReference, receivedFlowUnitStore, appContext);
networkThreadPoolReference, receivedFlowUnitStore);
this.rcaScheduler =
new RCAScheduler(connectedComponents,
db,
rcaConf,
thresholdMain,
persistable,
net,
appContext);
new RCAScheduler(connectedComponents, db, rcaConf, thresholdMain, persistable, net);

rcaNetServer.setSendDataHandler(new PublishRequestHandler(
nodeStateManager, receivedFlowUnitStore, networkThreadPoolReference));
rcaNetServer.setSubscribeHandler(
new SubscribeServerHandler(subscriptionManager, networkThreadPoolReference));

rcaScheduler.setRole(currentRole);
Thread rcaSchedulerThread = threadProvider.createThreadForRunnable(() -> rcaScheduler.start(),
PerformanceAnalyzerThreads.RCA_SCHEDULER);

Expand Down Expand Up @@ -232,8 +223,8 @@ public void run() {
readRcaEnabledFromConf();
if (rcaEnabled && tick % nodeRoleCheckInTicks == 0) {
tick = 0;
final InstanceDetails nodeDetails = appContext.getMyInstanceDetails();
if (nodeDetails.getRole() != NodeRole.UNKNOWN) {
final NodeDetails nodeDetails = ClusterDetailsEventProcessor.getCurrentNodeDetails();
if (nodeDetails != null) {
checkUpdateNodeRole(nodeDetails);
}
}
Expand All @@ -259,9 +250,9 @@ public void run() {
}
}

private void checkUpdateNodeRole(final InstanceDetails currentNode) {
final NodeRole currentNodeRole = currentNode.getRole();
boolean isMasterNode = currentNode.getIsMaster();
private void checkUpdateNodeRole(final NodeDetails currentNode) {
final NodeRole currentNodeRole = NodeRole.valueOf(currentNode.getRole());
boolean isMasterNode = currentNode.getIsMasterNode();
currentRole = isMasterNode ? NodeRole.ELECTED_MASTER : currentNodeRole;
}

Expand Down Expand Up @@ -409,11 +400,6 @@ public NodeRole getCurrentRole() {
return currentRole;
}

@VisibleForTesting
public AppContext getAppContext() {
return this.appContext;
}

public RCAScheduler getRcaScheduler() {
return rcaScheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
Expand Down Expand Up @@ -69,11 +66,6 @@ public abstract class Node<T extends GenericFlowUnit> {
*/
private Map<String, String> tags;

/**
* A view of the instanceDetails that the RCAs can have access to.
*/
private AppContext appContext;

Node(int level, long evaluationIntervalSeconds) {
this.downStreams = new ArrayList<>();
this.level = level;
Expand Down Expand Up @@ -195,33 +187,4 @@ public void setLocalFlowUnit(T localFlowUnit) {
public void readRcaConf(RcaConf conf) {
return;
}

public void setAppContext(final AppContext appContext) {
this.appContext = appContext;
}

public InstanceDetails getInstanceDetails() {
InstanceDetails ret = new InstanceDetails(AllMetrics.NodeRole.UNKNOWN);
if (this.appContext != null) {
ret = this.appContext.getMyInstanceDetails();
}
return ret;
}

public List<InstanceDetails> getAllClusterInstances() {
List<InstanceDetails> ret = Collections.EMPTY_LIST;

if (this.appContext != null) {
ret = this.appContext.getAllClusterInstances();
}
return ret;
}

public List<InstanceDetails> getDataNodeInstances() {
List<InstanceDetails> ret = Collections.EMPTY_LIST;
if (this.appContext != null) {
return this.appContext.getDataNodeInstances();
}
return ret;
}
}
Loading

0 comments on commit 59c461e

Please sign in to comment.