From 962031dcd4ed38fdbd678a1e51b53e35635d6780 Mon Sep 17 00:00:00 2001 From: Joydeep Sinha Date: Mon, 20 Jul 2020 10:55:09 -0700 Subject: [PATCH] rca it first cut, after removing the refactoring changes in the existing code --- .../PerformanceAnalyzerApp.java | 6 +- .../PerformanceAnalyzerWebServer.java | 1 + .../rca/RcaController.java | 80 ++-- .../rca/framework/core/ConfJsonWrapper.java | 37 +- .../framework/core/ConnectedComponent.java | 14 +- .../rca/persistence/Persistable.java | 8 + .../rca/persistence/PersistorBase.java | 4 + .../rca/persistence/SQLitePersistor.java | 53 +++ .../rca/scheduler/RCAScheduler.java | 2 +- .../rest/QueryRcaRequestHandler.java | 23 +- .../rca/RcaControllerTest.java | 12 +- .../rca/integTests/TestApi.java | 89 ++++ .../rca/integTests/framework/Cluster.java | 308 +++++++++++++ .../rca/integTests/framework/Host.java | 420 ++++++++++++++++++ .../rca/integTests/framework/RcaConfIt.java | 50 +++ .../integTests/framework/RcaControllerIt.java | 107 +++++ .../framework/RcaItMetricsDBProvider.java | 68 +++ .../integTests/framework/TestEnvironment.java | 155 +++++++ .../framework/annotations/AMetric.java | 20 + .../framework/annotations/ARcaConf.java | 23 + .../framework/annotations/ARcaGraph.java | 12 + .../framework/annotations/Table.java | 9 + .../framework/annotations/Tuple.java | 17 + .../framework/configs/ClusterType.java | 22 + .../integTests/framework/configs/Consts.java | 17 + .../integTests/framework/configs/HostTag.java | 10 + .../framework/configs/RcaState.java | 16 + .../DedicatedMasterWithHttpRunner.java | 102 +++++ .../framework/runners/IRcaItRunner.java | 61 +++ .../rca/integTests/tests/RcaItPoc.java | 77 ++++ .../integTests/tests/SimpleAnalysisGraph.java | 190 ++++++++ .../resources/rca/rca_elected_master.conf | 3 +- src/test/resources/rca/rca_master.conf | 3 +- 33 files changed, 1956 insertions(+), 63 deletions(-) create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/TestApi.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaConfIt.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaControllerIt.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaItMetricsDBProvider.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Table.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Tuple.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/RcaState.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/DedicatedMasterWithHttpRunner.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/RcaItPoc.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/SimpleAnalysisGraph.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java index 448fec44c..b3a791e4e 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java @@ -28,6 +28,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetClient; import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.JvmMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics; @@ -136,12 +137,13 @@ private static void startRcaTopLevelThread(final ClientServers clientServers, Util.DATA_DIR, RcaConsts.RCA_STATE_CHECK_INTERVAL_IN_MS, RcaConsts.nodeRolePollerPeriodicityInSeconds * 1000, - appContext + appContext, new MetricsDBProvider() ); startRcaTopLevelThread(rcaController, threadProvider); } - public static Thread startRcaTopLevelThread(final RcaController rcaController1, final ThreadProvider threadProvider) { + public static Thread startRcaTopLevelThread(final RcaController rcaController1, + final ThreadProvider threadProvider) { Thread rcaControllerThread = threadProvider.createThreadForRunnable(() -> rcaController1.run(), PerformanceAnalyzerThreads.RCA_CONTROLLER); rcaControllerThread.start(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java index b8c8b5eea..03093b217 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java @@ -58,6 +58,7 @@ public static HttpServer createInternalServer(String portFromSetting, String hos server.setExecutor(Executors.newCachedThreadPool()); return server; } catch (java.net.BindException ex) { + LOG.error("Could not create HttpServer on port {}", internalPort, ex); Runtime.getRuntime().halt(1); } catch (Exception ex) { ex.printStackTrace(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java index 44a0b6473..5ae413337 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java @@ -96,6 +96,7 @@ public class RcaController { // This needs to be volatile as the NodeRolePoller writes it but the Nanny reads it. protected volatile NodeRole currentRole = NodeRole.UNKNOWN; + private volatile List connectedComponents; private final ThreadProvider threadProvider; private RCAScheduler rcaScheduler; @@ -125,7 +126,9 @@ public class RcaController { private final AppContext appContext; - protected Queryable dbProvider = null; + protected volatile Queryable dbProvider = null; + + private volatile Persistable persistenceProvider; public RcaController( final ThreadProvider threadProvider, @@ -135,7 +138,8 @@ public RcaController( final String rca_enabled_conf_location, final long rcaStateCheckIntervalMillis, final long nodeRoleCheckPeriodicityMillis, - final AppContext appContext) { + final AppContext appContext, + final Queryable dbProvider) { this.threadProvider = threadProvider; this.appContext = appContext; this.netOpsExecutorService = netOpsExecutorService; @@ -152,6 +156,9 @@ public RcaController( this.rcaStateCheckIntervalMillis = rcaStateCheckIntervalMillis; this.roleCheckPeriodicity = nodeRoleCheckPeriodicityMillis; this.deliberateInterrupt = false; + this.connectedComponents = null; + this.dbProvider = dbProvider; + this.persistenceProvider = null; } @VisibleForTesting @@ -163,6 +170,7 @@ public RcaController() { rcaStateCheckIntervalMillis = 0; roleCheckPeriodicity = 0; appContext = null; + this.persistenceProvider = null; } protected List getRcaGraphComponents( @@ -178,35 +186,32 @@ private void start() { try { Objects.requireNonNull(subscriptionManager); Objects.requireNonNull(rcaConf); + if (dbProvider == null) { + return; + } subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus")); - List connectedComponents = getRcaGraphComponents(rcaConf); + this.connectedComponents = getRcaGraphComponents(rcaConf); // Mute the rca nodes after the graph creation and before the scheduler start readAndUpdateMutesRcasDuringStart(); - Queryable db; - if (dbProvider == null) { - db = new MetricsDBProvider(); - } else { - db = dbProvider; - } ThresholdMain thresholdMain = new ThresholdMain(RcaConsts.THRESHOLDS_PATH, rcaConf); - Persistable persistable = PersistenceFactory.create(rcaConf); + persistenceProvider = PersistenceFactory.create(rcaConf); networkThreadPoolReference .set(RcaControllerHelper.buildNetworkThreadPool(rcaConf.getNetworkQueueLength())); addRcaRequestHandler(); - queryRcaRequestHandler.setPersistable(persistable); + queryRcaRequestHandler.setPersistable(persistenceProvider); receivedFlowUnitStore = new ReceivedFlowUnitStore(rcaConf.getPerVertexBufferLength()); WireHopper net = new WireHopper(nodeStateManager, rcaNetClient, subscriptionManager, networkThreadPoolReference, receivedFlowUnitStore, appContext); this.rcaScheduler = new RCAScheduler(connectedComponents, - db, + dbProvider, rcaConf, thresholdMain, - persistable, + persistenceProvider, net, appContext); @@ -227,8 +232,9 @@ private void start() { | MalformedConfig | SQLException | IOException e) { - LOG.error("Couldn't build connected components or persistable.. Ran into {}", e.getMessage()); - e.printStackTrace(); + LOG.error("Couldn't build connected components or persistable..", e); + } catch (Exception ex) { + LOG.error("Couldn't start RcaController", ex); } } @@ -257,6 +263,10 @@ private void restart() { StatsCollector.instance().logMetric(RcaConsts.RCA_SCHEDULER_RESTART_METRIC); } + protected RcaConf getRcaConfForMyRole(NodeRole role) { + return RcaControllerHelper.pickRcaConfForRole(role); + } + public void run() { long tick = 1; long nodeRoleCheckInTicks = roleCheckPeriodicity / rcaStateCheckIntervalMillis; @@ -274,7 +284,7 @@ public void run() { // If RCA is enabled, update Analysis graph with Muted RCAs value if (rcaEnabled) { - rcaConf = RcaControllerHelper.pickRcaConfForRole(currentRole); + rcaConf = getRcaConfForMyRole(currentRole); LOG.debug("Updating Analysis Graph with Muted RCAs"); readAndUpdateMutesRcas(); } @@ -285,14 +295,17 @@ public void run() { Thread.sleep(rcaStateCheckIntervalMillis - duration); } } catch (InterruptedException ie) { - if (!deliberateInterrupt) { - LOG.error("RCA controller thread was interrupted. Reason: {}", ie.getMessage()); - LOG.error(ie); + if (deliberateInterrupt) { + // This should only happen in case of tests. So, its okay for this log level to be info. + LOG.info("RcaController thread interrupted.."); + } else { + LOG.error("RCA controller thread was interrupted.", ie); } break; } tick++; } + LOG.info("RcaController exits.."); } private void checkUpdateNodeRole(final InstanceDetails currentNode) { @@ -311,7 +324,12 @@ private void readRcaEnabledFromConf() { () -> { try (Scanner sc = new Scanner(filePath)) { String nextLine = sc.nextLine(); - rcaEnabled = Boolean.parseBoolean(nextLine); + boolean oldVal = rcaEnabled; + boolean newVal = Boolean.parseBoolean(nextLine); + if (oldVal != newVal) { + rcaEnabled = newVal; + LOG.info("RCA enabled changed from {} to {}", oldVal, newVal); + } } catch (IOException e) { LOG.error("Error reading file '{}': {}", filePath.toString(), e); e.printStackTrace(); @@ -350,8 +368,10 @@ private void readAndUpdateMutesRcasDuringStart() { */ private void readAndUpdateMutesRcas() { try { - if (ConnectedComponent.getNodeNames().isEmpty()) { - LOG.info("Analysis graph not initialized/has been reset; returning."); + Set allNodes = ConnectedComponent.getNodesForAllComponents(this.connectedComponents); + + if (allNodes.isEmpty()) { + LOG.debug("Analysis graph not initialized/has been reset; returning."); return; } @@ -363,18 +383,18 @@ private void readAndUpdateMutesRcas() { LOG.info("RCAs provided for muting : {}", rcasForMute); // Update rcasForMute to retain only valid RCAs - rcasForMute.retainAll(ConnectedComponent.getNodeNames()); + rcasForMute.retainAll(allNodes); // If rcasForMute post validation is empty but rcaConf.getMutedRcaList() is not empty // all the input RCAs are incorrect. if (rcasForMute.isEmpty() && !rcaConf.getMutedRcaList().isEmpty()) { if (lastModifiedTimeInMillisInMemory == 0) { LOG.error("Removing Incorrect RCA(s): {} provided before RCA Scheduler start. Valid RCAs: {}.", - rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames()); + rcaConf.getMutedRcaList(), allNodes); } else { LOG.error("Incorrect RCA(s): {}, cannot be muted. Valid RCAs: {}, Muted RCAs: {}", - rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames(), Stats.getInstance().getMutedGraphNodes()); + rcaConf.getMutedRcaList(), allNodes, Stats.getInstance().getMutedGraphNodes()); return; } } @@ -461,4 +481,14 @@ private void addRcaRequestHandler() { public void setDeliberateInterrupt() { deliberateInterrupt = true; } + + @VisibleForTesting + public List getConnectedComponents() { + return connectedComponents; + } + + @VisibleForTesting + public Persistable getPersistenceProvider() { + return persistenceProvider; + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java index e059a6ba9..f1f24411c 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java @@ -27,8 +27,19 @@ // TODO: There should be a validation for the expected fields. @JsonIgnoreProperties(ignoreUnknown = true) class ConfJsonWrapper { + public static final String RCA_STORE_LOC = "rca-store-location"; + public static final String THRESHOLD_STORE_LOC = "threshold-store-location"; + public static final String NEW_RCA_CHECK_MINS = "new-rca-check-minutes"; + public static final String NEW_THRESHOLDS_CHECK_MINS = "new-threshold-check-minutes"; + public static final String TAGS = "tags"; + public static final String REMOTE_PEERS = "remote-peers"; + public static final String DATASTORE = "datastore"; + public static final String ANALYSIS_GRAPH_IMPL = "analysis-graph-implementor"; + public static final String NETWORK_QUEUE_LEN = "network-queue-length"; + public static final String MAX_FLOW_UNIT_PER_VERTEX = "max-flow-units-per-vertex-buffer"; + public static final String RCA_CONFIG_SETTINGS = "rca-config-settings"; + public static final String MUTED_RCAS = "muted-rcas"; - private static final Logger LOG = LogManager.getLogger(ConfJsonWrapper.class); private final String rcaStoreLoc; private final String thresholdStoreLoc; private final long newRcaCheckPeriocicityMins; @@ -96,18 +107,18 @@ Map getRcaConfigSettings() { } ConfJsonWrapper( - @JsonProperty("rca-store-location") String rcaStoreLoc, - @JsonProperty("threshold-store-location") String thresholdStoreLoc, - @JsonProperty("new-rca-check-minutes") long newRcaCheckPeriocicityMins, - @JsonProperty("new-threshold-check-minutes") long newThresholdCheckPeriodicityMins, - @JsonProperty("tags") Map tags, - @JsonProperty("remote-peers") List peers, - @JsonProperty("datastore") Map datastore, - @JsonProperty("analysis-graph-implementor") String analysisGraphEntryPoint, - @JsonProperty("network-queue-length") int networkQueueLength, - @JsonProperty("max-flow-units-per-vertex-buffer") int perVertexBufferLength, - @JsonProperty("rca-config-settings") Map rcaConfigSettings, - @JsonProperty("muted-rcas") List mutedRcas) { + @JsonProperty(RCA_STORE_LOC) String rcaStoreLoc, + @JsonProperty(THRESHOLD_STORE_LOC) String thresholdStoreLoc, + @JsonProperty(NEW_RCA_CHECK_MINS) long newRcaCheckPeriocicityMins, + @JsonProperty(NEW_THRESHOLDS_CHECK_MINS) long newThresholdCheckPeriodicityMins, + @JsonProperty(TAGS) Map tags, + @JsonProperty(REMOTE_PEERS) List peers, + @JsonProperty(DATASTORE) Map datastore, + @JsonProperty(ANALYSIS_GRAPH_IMPL) String analysisGraphEntryPoint, + @JsonProperty(NETWORK_QUEUE_LEN) int networkQueueLength, + @JsonProperty(MAX_FLOW_UNIT_PER_VERTEX) int perVertexBufferLength, + @JsonProperty(RCA_CONFIG_SETTINGS) Map rcaConfigSettings, + @JsonProperty(MUTED_RCAS) List mutedRcas) { this.creationTime = System.currentTimeMillis(); this.rcaStoreLoc = rcaStoreLoc; this.thresholdStoreLoc = thresholdStoreLoc; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java index 016bf117b..6f0a65d7a 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java @@ -29,7 +29,7 @@ public class ConnectedComponent { /* The elements in the inner list can be executed in parallel. Two inner lists have to be executed in order. */ private List>> dependencyOrderedNodes; - private static Set nodeNames = new HashSet<>(); + private Set nodeNames = new HashSet<>(); private int graphId; public Set> getAllNodes() { @@ -117,8 +117,18 @@ public List>> getAllNodesByDependencyOrder() { return dependencyOrderedNodes; } - public static Set getNodeNames() { + public Set getNodeNames() { return nodeNames; } + public static Set getNodesForAllComponents(List connectedComponentList) { + Set allNodes = new HashSet<>(); + + if (connectedComponentList != null) { + for (ConnectedComponent connectedComponent : connectedComponentList) { + allNodes.addAll(connectedComponent.getNodeNames()); + } + } + return allNodes; + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java index 8293c68ca..354d2c7ce 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import org.jooq.Record; +import org.jooq.Result; public interface Persistable { /** @@ -54,4 +56,10 @@ public interface Persistable { void write(Node node, T flowUnit) throws SQLException, IOException; void close() throws SQLException; + + List> getRecordsForAllTables(); + + Result getRecordsForTable(String tableName); + + List getAllPersistedRcas(); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java index e953aa60d..47a5466b4 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java @@ -37,6 +37,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Result; import org.jooq.exception.DataAccessException; // TODO: Scheme to rotate the current file and garbage collect older files. @@ -113,6 +115,8 @@ abstract void createTable( abstract void createNewDSLContext(); + public abstract List> getRecordsForAllTables(); + // Not required for now. @Override public List read(Node node) { diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java index 9d4042b6f..065689f84 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java @@ -34,10 +34,14 @@ import com.google.gson.JsonSyntaxException; import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.CreateTableConstraintStep; @@ -46,6 +50,7 @@ import org.jooq.InsertValuesStepN; import org.jooq.JSONFormat; import org.jooq.Record; +import org.jooq.Record1; import org.jooq.Result; import org.jooq.SQLDialect; import org.jooq.SelectJoinStep; @@ -153,6 +158,35 @@ synchronized String readTables() { return tablesObject.toString(); } + public synchronized List> getRecordsForAllTables() { + List> results = new ArrayList<>(); + super.tableNames.forEach( + table -> results.add(getRecords(table)) + ); + return results; + } + + @Override + public Result getRecordsForTable(String tableName) { + return getRecords(tableName); + } + + + @Override + public List getAllPersistedRcas() { + List tables = new ArrayList<>(); + try { + tables = + (List) create.selectDistinct(ResourceFlowUnitFieldValue.RCA_NAME_FILELD.getField()) + .from(ResourceFlowUnit.RCA_TABLE_NAME) + .fetch(0).stream().collect(Collectors.toList()); + } catch(DataAccessException dex) { + + } + return tables; + } + + //read table content and convert it into JSON format private synchronized String readTable(String tableName) { String tableStr; @@ -174,6 +208,25 @@ private synchronized String readTable(String tableName) { return tableStr; } + private synchronized @Nullable Result getRecords(String tableName) { + try { + Result result; + if (tableName.equals(ResourceFlowUnit.RCA_TABLE_NAME)) { + result = create.select() + .from(tableName) + .orderBy(ResourceFlowUnitFieldValue.RCA_NAME_FILELD.getField()) + .fetch(); + } else { + result = create.select().from(tableName).fetch(); + } + return result; + } catch (DataAccessException e) { + LOG.error("Fail to read table {}", tableName); + } + return null; + } + + /** * FullTemperatureSummary is not one single Rca, instead it is a conglomeration of * temperature across all dimensions. Therefore, it iterates over all the dimensional tables diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java index 46f1518f7..01a46dfeb 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java @@ -65,7 +65,7 @@ public class RCAScheduler { ScheduledExecutorService scheduledPool; List connectedComponents; - Queryable db; + volatile Queryable db; RcaConf rcaConf; ThresholdMain thresholdMain; Persistable persistable; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java index 567f62eb6..5027495ab 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java @@ -162,13 +162,20 @@ public void handle(HttpExchange exchange) throws IOException { private void handleClusterRcaRequest(Map params, HttpExchange exchange) throws IOException { + //check if we are querying from elected master + if (!validNodeRole()) { + JsonObject errResponse = new JsonObject(); + errResponse.addProperty("error", "Node being queried is not elected master."); + sendResponse(exchange, errResponse.toString(), + HttpURLConnection.HTTP_BAD_REQUEST); + return; + } List rcaList = metricsRestUtil.parseArrayParam(params, NAME_PARAM, true); // query all cluster level RCAs if no RCA is specified in name. if (rcaList.isEmpty()) { - rcaList = SQLiteQueryUtils.getClusterLevelRca(); - } - //check if RCA is valid - if (!validParams(rcaList)) { + // rcaList = SQLiteQueryUtils.getClusterLevelRca(); + rcaList = persistable.getAllPersistedRcas(); + } else if (!validParams(rcaList)) { JsonObject errResponse = new JsonObject(); JsonArray errReason = new JsonArray(); SQLiteQueryUtils.getClusterLevelRca().forEach(errReason::add); @@ -178,14 +185,6 @@ private void handleClusterRcaRequest(Map params, HttpExchange ex HttpURLConnection.HTTP_BAD_REQUEST); return; } - //check if we are querying from elected master - if (!validNodeRole()) { - JsonObject errResponse = new JsonObject(); - errResponse.addProperty("error", "Node being queried is not elected master."); - sendResponse(exchange, errResponse.toString(), - HttpURLConnection.HTTP_BAD_REQUEST); - return; - } String response = getRcaData(persistable, rcaList).toString(); sendResponse(exchange, response, HttpURLConnection.HTTP_OK); } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java index 269936dc8..cb4f903a1 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java @@ -15,6 +15,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RCAScheduler; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RcaSchedulerState; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.spec.MetricsDBProviderTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; @@ -33,7 +34,6 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; @@ -45,7 +45,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.powermock.reflect.Whitebox; @Category(GradleTaskForRca.class) public class RcaControllerTest { @@ -114,7 +113,8 @@ public void setUp() throws Exception { rcaEnabledFileLoc.toString(), 100, 200, - new AppContext() + new AppContext(), + new MetricsDBProviderTestHelper() ); setMyIp(masterIP, AllMetrics.NodeRole.UNKNOWN); @@ -187,14 +187,14 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception { Field mutedGraphNodesField = Stats.class.getDeclaredField("mutedGraphNodes"); mutedGraphNodesField.setAccessible(true); mutedGraphNodesField.set(Stats.getInstance(), null); - Set initialComponentSet = ConnectedComponent.getNodeNames(); - Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", new HashSet<>()); + Set initialComponentSet = ConnectedComponent.getNodesForAllComponents(rcaController.getConnectedComponents()); + // Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", new HashSet<>()); readAndUpdateMutesRcas.invoke(rcaController); Assert.assertNull(Stats.getInstance().getMutedGraphNodes()); // Re-set back to initialComponentSet - Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", initialComponentSet); + // Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", initialComponentSet); } @Test diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/TestApi.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/TestApi.java new file mode 100644 index 000000000..c4708f334 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/TestApi.java @@ -0,0 +1,89 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.Cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.google.gson.JsonArray; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nonnull; +import org.jooq.Record; +import org.jooq.Result; + +/** + * This is API class whose object is injected into each of the test methods in case test class + * declares a @{code setTestApi(final TestApi api)}. + */ +public class TestApi { + /** + * An instance of the cluster object to get access to the nodes to query various data to see + * that the tests have the desired results. + */ + private final Cluster cluster; + + public TestApi(Cluster cluster) { + this.cluster = cluster; + } + + /** + * Returns the contents of all the RCA tables from all the nodes in the cluster in a json + * format. This can be too much data based on how long the test was running for. + * @return A json array representing the table data from all the nodes. + */ + public JsonArray getAllRcaData() { + return cluster.getAllRcaData(); + } + + /** + * Get all the data for a particular RCA by name. + * @param rcaName The name of the RCA for which the data is desired. + * @return The RCA data in json format. + */ + public JsonArray getDataForRca(String rcaName) { + return cluster.getDataForRca(rcaName); + } + + /** + * To get the result from all the RCA and summary tables across all nodes in the cluster. + * The key to the map is the node + * @return + */ + public Map>> getRecordsForAllTables() { + return cluster.getRecordsForAllTables(); + } + + /** + * Get the data for all tables for a particular host identified by its hostTags + * @param hostTag + * @return All the data for that host. + * @throws IllegalArgumentException if you provide the tag that does not have an associated + * host with it. + */ + public List> getRecordsForAllTables(HostTag hostTag) { + return cluster.getRecordsForAllTables(hostTag); + } + + /** + * @param hostTag the tag for the host. + * @param rcaName The data for the RCA which is asked for. + * @return All rows from the rca table from the host. + * @throws IllegalArgumentException if you provide tag that does not have a host associated + * with it. + */ + public Result getRecordsForAllTables(HostTag hostTag, String rcaName) { + return cluster.getRecordsForAllTables(hostTag, rcaName); + } + + /** + * This let's you make a REST request to the REST endpoint of a particular host identified by + * the host tag. + * @param params the key value map that is passes as the request parameter. + * @param hostByTag The host whose rest endpoint we will hit. + * @return The response serialized as a String. + */ + public String getRcaRestResponse(@Nonnull final Map params, + HostTag hostByTag) { + Objects.requireNonNull(params); + return cluster.getRcaRestResponse(params, hostByTag); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java new file mode 100644 index 000000000..7f4beee91 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java @@ -0,0 +1,308 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerWebServer; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.core.Util; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.exceptions.PAThreadException; +import com.google.gson.JsonArray; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.jooq.Record; +import org.jooq.Result; + +public class Cluster { + private static final Map hostIdToHostTagMapForDedicatedMaster = + new HashMap() {{ + put(0, HostTag.ELECTED_MASTER); + put(1, HostTag.STANDBY_MASTER_0); + put(2, HostTag.STANDBY_MASTER_1); + put(3, HostTag.DATA_0); + put(4, HostTag.DATA_1); + put(5, HostTag.DATA_2); + }}; + + private static final Map hostIdToHostTagMapCoLocatedMaster = + new HashMap() {{ + put(0, HostTag.ELECTED_MASTER); + put(1, HostTag.DATA_0); + put(2, HostTag.DATA_1); + }}; + + private static final HostTag hostIdToHostTagMapSingleNode = HostTag.DATA_0; + + private final BlockingQueue exceptionQueue; + private final boolean useHttps; + private final ClusterType clusterType; + private final List hostList; + private final File clusterDir; + private final ThreadProvider threadProvider; + private final Map> roleToHostMap; + private final boolean rcaEnabled; + private final Thread errorHandlingThread; + private final Map tagToHostMapping; + + /** + * @param type The type of cluster - can be dedicated master, colocated master or single node. + * @param clusterDir The directory that will be used by the cluster for files. + * @param useHttps Should the http and grpc connections use https. + */ + public Cluster(final ClusterType type, final File clusterDir, final boolean useHttps) { + this.clusterType = type; + this.hostList = new ArrayList<>(); + this.roleToHostMap = new HashMap<>(); + this.clusterDir = clusterDir; + this.rcaEnabled = true; + this.useHttps = useHttps; + this.threadProvider = new ThreadProvider(); + this.exceptionQueue = new ArrayBlockingQueue<>(1); + this.tagToHostMapping = new HashMap<>(); + + switch (type) { + case SINGLE_NODE: + // createSingleNodeCluster(useHttps, rcaEnabled); + break; + case MULTI_NODE_CO_LOCATED_MASTER: + // createMultiNodeCoLocatedMaster(useHttps, rcaEnabled); + break; + case MULTI_NODE_DEDICATED_MASTER: + createMultiNodeDedicatedMaster(); + break; + } + + for (Host host : hostList) { + host.setClusterDetails(hostList); + } + this.errorHandlingThread = PerformanceAnalyzerApp.startErrorHandlingThread(threadProvider, exceptionQueue); + } + + private void createMultiNodeDedicatedMaster() { + int currWebServerPort = PerformanceAnalyzerWebServer.WEBSERVICE_DEFAULT_PORT; + int currGrpcServerPort = Util.RPC_PORT; + int hostIdx = 0; + + createHost(hostIdx, AllMetrics.NodeRole.ELECTED_MASTER, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + + for (int i = 0; i < Consts.numberOfStandbyMasterNodesInDedicatedMasterCluster; i++) { + createHost(hostIdx, AllMetrics.NodeRole.MASTER, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + } + + for (int i = 0; i < Consts.numberOfDataNodesInDedicatedMasterCluster; i++) { + createHost(hostIdx, AllMetrics.NodeRole.DATA, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + } + } + + private Host createHost(int hostIdx, + AllMetrics.NodeRole role, + int webServerPort, + int grpcServerPort) { + HostTag hostTag = getTagForHostIdForHostTagAssignment(hostIdx); + Host host = new Host(hostIdx, + useHttps, + role, + webServerPort, + grpcServerPort, + this.clusterDir, + rcaEnabled, + hostTag); + tagToHostMapping.put(hostTag, host); + hostList.add(host); + + List hostByRole = roleToHostMap.get(role); + + if (hostByRole == null) { + hostByRole = new ArrayList<>(); + hostByRole.add(host); + roleToHostMap.put(role, hostByRole); + } else { + hostByRole.add(host); + } + return host; + } + + private HostTag getTagForHostIdForHostTagAssignment(int hostId) { + switch (clusterType) { + case MULTI_NODE_DEDICATED_MASTER: + return hostIdToHostTagMapForDedicatedMaster.get(hostId); + case MULTI_NODE_CO_LOCATED_MASTER: + return hostIdToHostTagMapCoLocatedMaster.get(hostId); + case SINGLE_NODE: + return hostIdToHostTagMapSingleNode; + } + throw new IllegalStateException("No cluster type matches"); + } + + public void createServersAndThreads() throws Exception { + for (Host host : hostList) { + host.createServersAndThreads(threadProvider); + } + } + + public void startRcaControllerThread() { + for (Host host : hostList) { + host.startRcaControllerThread(); + } + } + + private void createSingleNodeCluster(boolean useHttps, boolean rcaEnabled) { + // Host host = new Host( + // 0, + // useHttps, + // AllMetrics.NodeRole.ELECTED_MASTER, + // PerformanceAnalyzerWebServer.WEBSERVICE_DEFAULT_PORT, + // Util.RPC_PORT, clusterDir, rcaEnabled); + // hostList.add(host); + } + + private void createMultiNodeCoLocatedMaster(boolean useHttps, boolean rcaEnabled) { + // int currWebServerPort = PerformanceAnalyzerWebServer.WEBSERVICE_DEFAULT_PORT; + // int currGrpcServerPort = Util.RPC_PORT; + // int hostIdx = 0; + + // Host host = + // new Host(0, useHttps, AllMetrics.NodeRole.ELECTED_MASTER, currWebServerPort, currGrpcServerPort, clusterDir, rcaEnabled); + // roleToHostMap.put(AllMetrics.NodeRole.ELECTED_MASTER, Collections.singletonList(host)); + + // currWebServerPort += 1; + // currGrpcServerPort += 1; + // hostIdx += 1; + + // List otherNodes = new ArrayList<>(); + // for (int i = 0; i < numberOfCoLocatedMasterHosts - 1; i++) { + // otherNodes.add( + // new Host(hostIdx, + // useHttps, + // AllMetrics.NodeRole.DATA, + // currWebServerPort, + // currGrpcServerPort, + // clusterDir, + // rcaEnabled)); + + // currWebServerPort += 1; + // currGrpcServerPort += 1; + // hostIdx += 1; + // } + // roleToHostMap.put(AllMetrics.NodeRole.DATA, otherNodes); + } + + public void deleteCluster() throws IOException { + for (List hosts : roleToHostMap.values()) { + for (Host host : hosts) { + host.deleteHost(); + } + } + errorHandlingThread.interrupt(); + // TODO(yojs): remove + // FileUtils.deleteDirectory(clusterDir); + } + + public void stopRcaScheduler() throws Exception { + for (Host host : hostList) { + host.stopRcaScheduler(); + } + } + + public void startRcaScheduler() throws Exception { + for (Host host : hostList) { + host.startRcaScheduler(); + } + } + + public void updateGraph(final Class rcaGraphClass) + throws NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { + for (Host host : hostList) { + host.updateRcaGraph(rcaGraphClass); + } + } + + public void updateMetricsDB(AMetric[] metricAnnotations) throws Exception { + for (Host host : hostList) { + host.updateMetricsDB(metricAnnotations); + } + } + + public JsonArray getAllRcaData() { + JsonArray arr = new JsonArray(); + for (Host host : hostList) { + arr.add(host.getAllRcaData()); + } + return arr; + } + + public JsonArray getDataForRca(String rcaName) { + JsonArray arr = new JsonArray(); + for (Host host : hostList) { + arr.add(host.getDataForRca(rcaName)); + } + return arr; + } + + public Map>> getRecordsForAllTables() { + Map>> map = new HashMap<>(); + for (Host host: hostList) { + map.put(host.getMyTag(), host.getRecordsForAllTables()); + } + return map; + } + + public List> getRecordsForAllTables(HostTag hostTag) { + return verifyTag(hostTag).getRecordsForAllTables(); + } + + private Host verifyTag(HostTag hostTag) { + Host host = tagToHostMapping.get(hostTag); + if (host == null) { + throw new IllegalArgumentException("No host with tag '" + hostTag + "' exists. " + + "Available tags are: " + tagToHostMapping.keySet()); + } + return host; + } + + public Result getRecordsForAllTables(HostTag hostTag, String rcaName) { + return verifyTag(hostTag).getRecordsForTable(rcaName); + } + + public String getRcaRestResponse(final Map params, HostTag hostByTag) { + return verifyTag(hostByTag).makeRestRequest(params); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java new file mode 100644 index 000000000..d1809a69c --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java @@ -0,0 +1,420 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.core.Util; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.GRPCConnectionManager; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.Table; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.Tuple; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.RcaState; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RCAScheduler; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RcaSchedulerState; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.exception.DataAccessException; +import org.junit.Assert; + +/** + * This class simulates a cluster node that executes an RCA graph. Each node has its own + * - GRPC server, + * - web server + * - RCAController and everything that it starts. + */ +public class Host { + private final boolean useHttps; + /** + * Each host has its own AppContext instance. + */ + private final AppContext appContext; + private final HostTag myTag; + /** + * This uniquely identifies a host. + */ + private final int hostId; + /** + * For Integration tests, where all the virtual nodes are part of the same JVM, Ip string does not matter. But for + * the sake of having this value filled, the string is 127.0.0.(hostId). + */ + private final String hostIp; + private final AllMetrics.NodeRole role; + private final int webServerPort; + private final int grpcServerPort; + private final File hostDir; + private final boolean rcaEnabled; + private GRPCConnectionManager connectionManager; + private ClientServers clientServers; + private ScheduledExecutorService netOperationsExecutor; + private RcaControllerIt rcaController; + private Thread grpcThread; + private Thread webServerThread; + private Thread rcaControllerThread; + private ThreadProvider threadProvider; + private Path rcaEnabledFile; + + public Host(int hostId, + boolean useHttps, + AllMetrics.NodeRole role, + int httpServerPort, + int grpcServerPort, + File clusterDir, + boolean rcaEnabled, + HostTag myTag) { + this.rcaEnabled = rcaEnabled; + this.useHttps = useHttps; + this.appContext = new AppContext(); + + this.hostId = hostId; + this.myTag = myTag; + + //TODO: make sure this works with the grpc and the webserver. + this.hostIp = createHostIp(); + this.role = role; + + this.webServerPort = httpServerPort; + this.grpcServerPort = grpcServerPort; + + this.hostDir = createHostDir(clusterDir, myTag); + } + + public static String createHostIp() { + return "127.0.0.1"; + } + + private static File createHostDir(File clusterDir, HostTag hostTag) { + File hostFile = Paths.get(clusterDir.getAbsolutePath(), hostTag.toString()).toFile(); + if (!hostFile.exists() && !hostFile.mkdirs()) { + throw new IllegalStateException("Couldn't create dir: " + hostFile); + } + return hostFile; + } + + public void createServersAndThreads(final ThreadProvider threadProvider) throws Exception { + this.threadProvider = threadProvider; + Objects.requireNonNull(appContext.getClusterDetailsEventProcessor(), + "ClusterDetailsEventProcessor cannot be null in the AppContext"); + + rcaEnabledFile = Paths.get(hostDir.getAbsolutePath(), RcaController.RCA_ENABLED_CONF_FILE); + setRcaState(RcaState.fromBoolean(rcaEnabled)); + + this.connectionManager = new GRPCConnectionManager(useHttps); + this.clientServers = PerformanceAnalyzerApp.createClientServers(connectionManager, + grpcServerPort, + null, + null, + useHttps, + String.valueOf(webServerPort), + null, // A null host is fine as this will use the loopback address + this.appContext); + + this.grpcThread = PerformanceAnalyzerApp.startGrpcServerThread(clientServers.getNetServer(), threadProvider); + this.webServerThread = PerformanceAnalyzerApp.startWebServerThread(clientServers.getHttpServer(), threadProvider); + + netOperationsExecutor = + Executors.newScheduledThreadPool( + 3, new ThreadFactoryBuilder().setNameFormat("test-network-thread-%d").build()); + + this.rcaController = new RcaControllerIt( + threadProvider, + netOperationsExecutor, + connectionManager, + clientServers, + hostDir.getAbsolutePath(), + 10, + 10, + role, + appContext, + null); + } + + // We create a temporary file and then swap it for the rca.enabled file. + public void setRcaState(RcaState rcaState) throws IOException { + Path rcaEnabledTmp = Paths.get(rcaEnabledFile + ".tmp"); + try (FileWriter f2 = new FileWriter(rcaEnabledTmp.toFile(), false /*To create a new file*/)) { + boolean value = true; + switch (rcaState) { + case ENABLED: + value = true; + break; + case DISABLED: + value = false; + break; + } + f2.write(String.valueOf(value)); + } catch (IOException e) { + e.printStackTrace(); + return; + } + rcaEnabledTmp.toFile().renameTo(rcaEnabledFile.toFile()); + } + + public void setClusterDetails(final List allHosts) { + List nodeDetails = new ArrayList<>(); + + // The first node in the list is always the node-itself. + nodeDetails.add(hostToNodeDetails(this)); + + for (Host host : allHosts) { + if (host.hostId != this.hostId) { + nodeDetails.add(hostToNodeDetails(host)); + } + } + ClusterDetailsEventProcessor clusterDetailsEventProcessor = new ClusterDetailsEventProcessor(); + clusterDetailsEventProcessor.setNodesDetails(nodeDetails); + appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor); + } + + public static ClusterDetailsEventProcessor.NodeDetails hostToNodeDetails(final Host host) { + return new ClusterDetailsEventProcessor.NodeDetails( + host.role, + host.getMyTag().toString(), + host.hostIp, + host.isElectedMaster()); + } + + public boolean isElectedMaster() { + return AllMetrics.NodeRole.ELECTED_MASTER == this.role; + } + + public void deleteHost() { + RCAScheduler rcaScheduler = rcaController.getRcaScheduler(); + if (rcaScheduler != null && rcaScheduler.getState() == RcaSchedulerState.STATE_STARTED) { + rcaScheduler.shutdown(); + } + netOperationsExecutor.shutdown(); + try { + netOperationsExecutor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + e.printStackTrace(); + } + clientServers.getHttpServer().stop(0); + clientServers.getNetClient().stop(); + clientServers.getNetServer().stop(); + + connectionManager.shutdown(); + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + + webServerThread.interrupt(); + + clientServers.getNetServer().setAttemptedShutdown(); + grpcThread.interrupt(); + + rcaController.setDeliberateInterrupt(); + + System.out.println("RCA Controller thread for host " + hostId + " is being interrupted."); + rcaControllerThread.interrupt(); + + // TODO(yojs): remove + //FileUtils.deleteDirectory(hostDir); + + System.out.println("Host '" + hostId + "' with role '" + rcaController.getCurrentRole() + "' cleaned up"); + } + + public void startRcaControllerThread() { + this.rcaControllerThread = PerformanceAnalyzerApp.startRcaTopLevelThread(rcaController, threadProvider); + } + + public void stopRcaScheduler() throws Exception { + setRcaState(RcaState.DISABLED); + // rcaController.waitForRcaState(false); + Thread.sleep(1000); + } + + public void startRcaScheduler() throws Exception { + setRcaState(RcaState.ENABLED); + // rcaController.waitForRcaState(true); + Thread.sleep(1000); + } + + public void updateRcaGraph(final Class rcaGraphClass) + throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + rcaController.setRcaGraphComponents(rcaGraphClass); + } + + public void updateMetricsDB(AMetric[] metricAnnotations) throws Exception { + RcaItMetricsDBProvider dbProvider = + new RcaItMetricsDBProvider(Paths.get(hostDir.getPath(), "metricsdb").toString()); + for (AMetric metric : metricAnnotations) { + boolean foundDataForHost = false; + // Each metric can have only one data table that can be associated to a host. + // Which one is determined by the hostTag. The first matching is added to the host + // for the current metric. + dataLoop: for (Table table : metric.tables()) { + for (HostTag dataTag: table.hostTag()) + if (myTag == dataTag) { + // First data-tag to match the hostTags is considered to be a match + for (Tuple tuple : table.tuple()) { + dbProvider.insertRow(metric.name(), + metric.dimensionNames(), + tuple.dimensionValues(), + tuple.min(), + tuple.max(), + tuple.avg(), + tuple.sum()); + } + foundDataForHost = true; + // We found a data table matching the tags of the host. Let's move to the + // next metric. + break dataLoop; + } + } + if (!foundDataForHost) { + // This is not an error though. For example, a dedicated master node cannot emit + // a shard related metric. + System.out.println("No data found for host " + hostId + " for metric " + metric.name()); + } + } + + rcaController.setDbProvider(dbProvider); + } + + public JsonObject getAllRcaData() { + JsonParser parser = new JsonParser(); + JsonElement data = parser.parse(this.rcaController.getPersistenceProvider().read()); + JsonObject obj = new JsonObject(); + obj.addProperty(Consts.HOST_ID_KEY, myTag.toString()); + obj.addProperty(Consts.HOST_ROLE_KEY, role.toString()); + obj.add(Consts.DATA_KEY, data); + return obj; + } + + public JsonElement getDataForRca(String rcaName) { + JsonElement data = this.rcaController.getPersistenceProvider().read(rcaName); + JsonObject obj = new JsonObject(); + obj.addProperty(Consts.HOST_ID_KEY, hostId); + obj.addProperty(Consts.HOST_ROLE_KEY, role.toString()); + obj.add(Consts.DATA_KEY, data); + return obj; + } + + public List> getRecordsForAllTables() { + return this.rcaController.getPersistenceProvider().getRecordsForAllTables(); + } + + Result getRecordsForTable(String tableName) { + return this.rcaController.getPersistenceProvider().getRecordsForTable(tableName); + } + + public String makeRestRequest(final Map kvRequestParams) { + StringBuilder queryString = new StringBuilder(); + + String appender = ""; + for (Map.Entry entry: kvRequestParams.entrySet()) { + queryString.append(appender).append(entry.getKey()).append("=").append(entry.getValue()); + appender = "&"; + } + StringBuilder uri = + new StringBuilder("http://localhost:" + webServerPort + Util.RCA_QUERY_URL); + uri.append("?").append(queryString); + + URL url = null; + try { + url = new URL(uri.toString()); + } catch (MalformedURLException e) { + e.printStackTrace(); + Assert.fail(); + } + + String response = ""; + HttpURLConnection connection = null; + + try { + connection = (HttpURLConnection) url.openConnection(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + + try { + connection.setRequestMethod("GET"); + } catch (ProtocolException e) { + e.printStackTrace(); + connection.disconnect(); + Assert.fail(); + } + + try { + int status = connection.getResponseCode(); + if (status != 200) { + List ret = + new BufferedReader(new InputStreamReader(connection.getErrorStream())).lines().collect(Collectors.toList()); + throw new IllegalStateException(ret.toString()); + } + } catch (IOException e) { + e.printStackTrace(); + connection.disconnect(); + Assert.fail(); + } + + try (BufferedReader in = new BufferedReader( + new InputStreamReader(connection.getInputStream()))) { + String inputLine; + StringBuffer content = new StringBuffer(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + response = content.toString(); + } catch (IOException e) { + e.printStackTrace(); + connection.disconnect(); + Assert.fail(); + } + return response; + } + + public HostTag getMyTag() { + return myTag; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaConfIt.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaConfIt.java new file mode 100644 index 000000000..32c15bb98 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaConfIt.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts; +import java.util.HashMap; +import java.util.Map; + +public class RcaConfIt extends RcaConf { + private String rcaDataStorePath; + private String rcaAnalysisGraph; + + public RcaConfIt(RcaConf rcaConf) { + super(rcaConf.getConfigFileLoc()); + } + + public void setRcaDataStorePath(String dataStorePath) { + this.rcaDataStorePath = dataStorePath; + } + + public void setRcaAnalysisGraph(String rcaAnalysisGraph) { + this.rcaAnalysisGraph = rcaAnalysisGraph; + } + + @Override + public Map getDatastore() { + Map map = new HashMap<>(super.getDatastore()); + map.put(RcaConsts.DATASTORE_LOC_KEY, rcaDataStorePath); + return map; + } + + @Override + public String getAnalysisGraphEntryPoint() { + return rcaAnalysisGraph; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaControllerIt.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaControllerIt.java new file mode 100644 index 000000000..9ef6ebb5a --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaControllerIt.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.GRPCConnectionManager; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.WaitFor; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class RcaControllerIt extends RcaController { + private final String rcaPath; + private List rcaGraphComponents; + + public RcaControllerIt(ThreadProvider threadProvider, + ScheduledExecutorService netOpsExecutorService, + GRPCConnectionManager grpcConnectionManager, + ClientServers clientServers, + String rca_enabled_conf_location, + long rcaStateCheckIntervalMillis, + long nodeRoleCheckPeriodicityMillis, + AllMetrics.NodeRole nodeRole, + final AppContext appContext, + final Queryable dbProvider) { + super(threadProvider, + netOpsExecutorService, + grpcConnectionManager, + clientServers, + rca_enabled_conf_location, + rcaStateCheckIntervalMillis, + nodeRoleCheckPeriodicityMillis, + appContext, + dbProvider); + this.currentRole = nodeRole; + this.rcaPath = rca_enabled_conf_location; + } + + @Override + protected List getRcaGraphComponents(RcaConf rcaConf) throws + ClassNotFoundException, + NoSuchMethodException, + InvocationTargetException, + InstantiationException, + IllegalAccessException { + if (rcaGraphComponents != null) { + return rcaGraphComponents; + } else { + return super.getRcaGraphComponents(rcaConf); + } + } + + @Override + protected RcaConf getRcaConfForMyRole(AllMetrics.NodeRole nodeRole) { + RcaConfIt rcaConfIt = new RcaConfIt(super.getRcaConfForMyRole(nodeRole)); + rcaConfIt.setRcaDataStorePath(rcaPath); + return rcaConfIt; + } + + public void setRcaGraphComponents(Class rcaGraphClass) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + AnalysisGraph graphObject = + (AnalysisGraph) rcaGraphClass.getDeclaredConstructor().newInstance(); + this.rcaGraphComponents = RcaUtil.getAnalysisGraphComponents(graphObject); + } + + public void setDbProvider(final Queryable db) { + dbProvider = db; + } + + protected void checkUpdateNodeRole() { + } + + public synchronized void setNodeRole(AllMetrics.NodeRole role) { + this.currentRole = role; + } + + public void waitForRcaState(boolean enabled) throws Exception { + WaitFor.waitFor(() -> { + boolean isEnabled = isRcaEnabled(); + return isEnabled == enabled; + }, 20, TimeUnit.SECONDS); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaItMetricsDBProvider.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaItMetricsDBProvider.java new file mode 100644 index 000000000..371cbb187 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/RcaItMetricsDBProvider.java @@ -0,0 +1,68 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.Dimensions; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.Metric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; + +public class RcaItMetricsDBProvider extends MetricsDBProvider { + private final String DB_FILE_PATH; + private final MetricsDB db; + + public RcaItMetricsDBProvider(String metricsDbFilePath) throws Exception { + DB_FILE_PATH = metricsDbFilePath; + // Cleanup the file if exists. + try { + Files.delete(Paths.get(DB_FILE_PATH)); + } catch (NoSuchFileException ignored) { + } + + try { + Files.delete(Paths.get(DB_FILE_PATH + "-journal")); + } catch (NoSuchFileException ignored) { + } + + // TODO: clean up the DB after the tests are done. + db = new MetricsDB(System.currentTimeMillis()) { + @Override + public String getDBFilePath() { + Path configPath = Paths.get(DB_FILE_PATH); + return configPath.toString(); + } + + @Override + public void deleteOnDiskFile() { + try { + Files.delete(Paths.get(getDBFilePath())); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + } + + @Override + public MetricsDB getMetricsDB() { + return db; + } + + public void insertRow(String metricName, + String[] dimensionNames, + String[] dimensionValues, + double min, double max, double avg, double sum) { + Dimensions dimensions = new Dimensions(); + for (int i = 0; i < dimensionNames.length; i++) { + dimensions.put(dimensionNames[i], dimensionValues[i]); + } + Metric metric = new Metric(metricName, sum, avg, min, max); + + db.createMetric(metric, Arrays.asList(dimensionNames)); + db.putMetric(metric, dimensions, 0); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java new file mode 100644 index 000000000..7ce2aa1cd --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java @@ -0,0 +1,155 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaControllerHelper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaGraph; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestEnvironment { + private final Cluster cluster; + private final Env classLevelEnv; + + private Env currentEnv; + + public TestEnvironment(final Cluster cluster, final Class testClass) throws Exception { + this.cluster = cluster; + this.classLevelEnv = updateEnvironment(testClass); + } + + private Env updateEnvironment(final Class testClass) throws Exception { + boolean annotationsPresent = testClass.isAnnotationPresent(ARcaConf.class) + | testClass.isAnnotationPresent(ARcaGraph.class) + | testClass.isAnnotationPresent(ARcaConf.class); + + Env env = new Env(); + + if (annotationsPresent) { + if (testClass.isAnnotationPresent(ARcaConf.class)) { + updateRcaConf((ARcaConf) testClass.getAnnotation(ARcaConf.class), env); + } + if (testClass.isAnnotationPresent(ARcaGraph.class)) { + updateRcaGraph((ARcaGraph) testClass.getAnnotation(ARcaGraph.class), env); + } + + if (testClass.isAnnotationPresent(AMetric.Metrics.class) + || testClass.isAnnotationPresent(AMetric.class)) { + updateMetricsDB((AMetric[]) testClass.getAnnotationsByType(AMetric.class), env); + } + } + if (env.rcaConfMap.isEmpty()) { + updateWithDefaultRcaConfAnnotation(env); + } + return env; + } + + private void updateRcaConf(ARcaConf aRcaConf, Env env) { + String masterRcaConf = aRcaConf.electedMaster(); + env.rcaConfMap.put(ARcaConf.Type.ELECTED_MASTER, masterRcaConf); + + String standByMasterRcaConf = aRcaConf.standBy(); + env.rcaConfMap.put(ARcaConf.Type.STANDBY_MASTER, standByMasterRcaConf); + + String dataNodesRcaConf = aRcaConf.dataNode(); + env.rcaConfMap.put(ARcaConf.Type.DATA_NODES, dataNodesRcaConf); + + RcaControllerHelper.set(dataNodesRcaConf, standByMasterRcaConf, masterRcaConf); + } + + private void updateRcaGraph(ARcaGraph aRcaGraph, Env env) throws NoSuchMethodException, + IllegalAccessException, InvocationTargetException, InstantiationException { + Class graphClass = aRcaGraph.value(); + env.rcaGraphClass = graphClass; + cluster.updateGraph(graphClass); + } + + private void updateMetricsDB(AMetric[] metrics, Env env) throws Exception { + cluster.updateMetricsDB(metrics); + env.isMetricsDBProviderSet = true; + } + + private void updateWithDefaultRcaConfAnnotation(Env env) throws NoSuchMethodException { + String masterRcaConf = (String) ARcaConf.class.getMethod("electedMaster").getDefaultValue(); + env.rcaConfMap.put(ARcaConf.Type.ELECTED_MASTER, masterRcaConf); + + String standByMasterRcaConf = + (String) ARcaConf.class.getMethod("standBy").getDefaultValue(); + env.rcaConfMap.put(ARcaConf.Type.STANDBY_MASTER, standByMasterRcaConf); + + String dataNodesRcaConf = + (String) ARcaConf.class.getMethod("dataNode").getDefaultValue(); + env.rcaConfMap.put(ARcaConf.Type.DATA_NODES, dataNodesRcaConf); + + RcaControllerHelper.set(dataNodesRcaConf, standByMasterRcaConf, masterRcaConf); + } + + public void updateEnvironment(final Method method) throws Exception { + boolean annotationsPresent = method.isAnnotationPresent(ARcaConf.class) + | method.isAnnotationPresent(ARcaGraph.class) + | method.isAnnotationPresent(ARcaConf.class); + + this.currentEnv = new Env(this.classLevelEnv); + + if (annotationsPresent) { + if (method.isAnnotationPresent(ARcaConf.class)) { + updateRcaConf(method.getAnnotation(ARcaConf.class), currentEnv); + } + if (method.isAnnotationPresent(ARcaGraph.class)) { + updateRcaGraph(method.getAnnotation(ARcaGraph.class), currentEnv); + } + + if (method.isAnnotationPresent(AMetric.Metrics.class)) { + updateMetricsDB(method.getAnnotationsByType(AMetric.class), currentEnv); + } + } + } + + public void clearUpMethodLevelEnvOverride() { + this.currentEnv = this.classLevelEnv; + } + + public void verifyEnvironmentSetup() throws IllegalStateException, NoSuchMethodException { + List annotationsNotSet = new ArrayList<>(); + + if (currentEnv.rcaConfMap.isEmpty()) { + annotationsNotSet.add(ARcaGraph.class.getSimpleName()); + } + if (currentEnv.rcaGraphClass == null) { + annotationsNotSet.add(ARcaGraph.class.getSimpleName()); + } + if (currentEnv.isMetricsDBProviderSet == false) { + annotationsNotSet.add(AMetric.class.getSimpleName()); + } + + if (!annotationsNotSet.isEmpty()) { + StringBuilder err = new StringBuilder("** Some annotations are not set"); + err.append(annotationsNotSet); + throw new IllegalStateException(err.toString()); + } + } + + class Env { + private final Map rcaConfMap; + private Class rcaGraphClass; + private boolean isMetricsDBProviderSet; + + Env() { + this.rcaConfMap = new HashMap<>(); + this.rcaGraphClass = null; + this.isMetricsDBProviderSet = false; + } + + Env(Env other) { + this.rcaConfMap = other.rcaConfMap; + this.rcaGraphClass = other.rcaGraphClass; + this.isMetricsDBProviderSet = other.isMetricsDBProviderSet; + } + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java new file mode 100644 index 000000000..ca825e737 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java @@ -0,0 +1,20 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +@Repeatable(value = AMetric.Metrics.class) +public @interface AMetric { + String name(); + + String[] dimensionNames(); + + Table[] tables(); + + @Retention(RetentionPolicy.RUNTIME) + @interface Metrics { + AMetric[] value(); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java new file mode 100644 index 000000000..5603319ca --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java @@ -0,0 +1,23 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ARcaConf { + String electedMaster() default Consts.RCAIT_DEFAULT_RCA_CONF_ELECTED_MASTER_NODE; + + String standBy() default Consts.RCAIT_DEFAULT_RCA_CONF_STANDBY_MASTER_NODE; + + String dataNode() default Consts.RCAIT_DEFAULT_RCA_CONF_DATA_NODE; + + enum Type { + ELECTED_MASTER, + STANDBY_MASTER, + DATA_NODES + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java new file mode 100644 index 000000000..ee1c0499b --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java @@ -0,0 +1,12 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface ARcaGraph { + Class value(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Table.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Table.java new file mode 100644 index 000000000..77d5f6a61 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Table.java @@ -0,0 +1,9 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; + +public @interface Table { + HostTag[] hostTag(); + + Tuple[] tuple(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Tuple.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Tuple.java new file mode 100644 index 000000000..d3cdf92a6 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/Tuple.java @@ -0,0 +1,17 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +public @interface Tuple { + String[] dimensionValues(); + + double min(); + + double max(); + + double sum(); + + double avg(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java new file mode 100644 index 000000000..64485ad6d --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +public enum ClusterType { + SINGLE_NODE, + MULTI_NODE_CO_LOCATED_MASTER, + MULTI_NODE_DEDICATED_MASTER +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java new file mode 100644 index 000000000..94aa845b7 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java @@ -0,0 +1,17 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +public class Consts { + public static final String RCA_IT_BASE_DIR = "/tmp/rcaIt"; + public static final String RCA_IT_CLUSTER_DIR_FORMAT = "yyyy.MM.dd.HH.mm.ss"; + public static final String TEST_RESOURCES_DIR = "./src/test/resources/rca/"; + public static final String RCAIT_DEFAULT_RCA_CONF_ELECTED_MASTER_NODE = TEST_RESOURCES_DIR + "rca_elected_master.conf"; + public static final String RCAIT_DEFAULT_RCA_CONF_STANDBY_MASTER_NODE = TEST_RESOURCES_DIR + "rca_master.conf"; + public static final String RCAIT_DEFAULT_RCA_CONF_DATA_NODE = TEST_RESOURCES_DIR + "rca.conf"; + + public static final String HOST_ID_KEY = "hostId"; + public static final String HOST_ROLE_KEY = "hostRole"; + public static final String DATA_KEY = "data"; + public static final int numberOfDataNodesInDedicatedMasterCluster = 3; + public static final int numberOfStandbyMasterNodesInDedicatedMasterCluster = 2; + private static final int numberOfCoLocatedMasterHosts = 3; +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java new file mode 100644 index 000000000..919790ef8 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java @@ -0,0 +1,10 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +public enum HostTag { + ELECTED_MASTER, + STANDBY_MASTER_0, + STANDBY_MASTER_1, + DATA_0, + DATA_1, + DATA_2 +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/RcaState.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/RcaState.java new file mode 100644 index 000000000..359d9ec61 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/RcaState.java @@ -0,0 +1,16 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +public enum RcaState { + ENABLED, + DISABLED; + + public static RcaState fromBoolean(boolean enableRca) { + RcaState state; + if (enableRca) { + state = RcaState.ENABLED; + } else { + state = RcaState.DISABLED; + } + return state; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/DedicatedMasterWithHttpRunner.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/DedicatedMasterWithHttpRunner.java new file mode 100644 index 000000000..66683b5ca --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/DedicatedMasterWithHttpRunner.java @@ -0,0 +1,102 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.TestApi; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.Cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.TestEnvironment; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import java.io.IOException; +import java.lang.reflect.Method; +import org.junit.Test; +import org.junit.runner.Description; +import org.junit.runner.Runner; +import org.junit.runner.notification.RunNotifier; + +public class DedicatedMasterWithHttpRunner extends Runner implements IRcaItRunner { + public static final String SET_CLUSTER_METHOD = "setTestApi"; + private static final ClusterType CLUSTER_TYPE = ClusterType.MULTI_NODE_DEDICATED_MASTER; + private final Class testClass; + private final Object testObject; + private final TestEnvironment testEnvironment; + + private ARcaConf rcaConf; + private Cluster cluster; + + public DedicatedMasterWithHttpRunner(Class testClass) throws Exception { + super(); + this.testClass = testClass; + this.cluster = createCluster(CLUSTER_TYPE); + TestApi api = new TestApi(cluster); + this.testObject = testClass.getDeclaredConstructor().newInstance(); + + try { + Method setClusterMethod = testClass.getMethod(SET_CLUSTER_METHOD, TestApi.class); + setClusterMethod.setAccessible(true); + setClusterMethod.invoke(testObject, api); + } catch (NoSuchMethodException ex) { + // This test class hasn't defined a method setCluster(Cluster). SO probably it does not need + // access to the cluster object. Which is fine. We move on to the method execution. + } + + cluster.createServersAndThreads(); + this.testEnvironment = new TestEnvironment(cluster, testClass); + cluster.startRcaControllerThread(); + } + + @Override + public Description getDescription() { + return Description.createTestDescription(testClass, "A custom runner for RcaIt"); + } + + @Override + public void run(RunNotifier notifier) { + System.out.println("running the tests for: " + testClass); + try { + for (Method method : testClass.getMethods()) { + if (method.isAnnotationPresent(Test.class)) { + notifier.fireTestStarted(Description + .createTestDescription(testClass, method.getName())); + + this.testEnvironment.updateEnvironment(method); + this.testEnvironment.verifyEnvironmentSetup(); + cluster.startRcaScheduler(); + method.invoke(testObject); + cluster.stopRcaScheduler(); + this.testEnvironment.clearUpMethodLevelEnvOverride(); + + notifier.fireTestFinished(Description + .createTestDescription(testClass, method.getName())); + } + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + try { + cluster.deleteCluster(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Override + public void setUpRunner() throws IOException { + this.cluster = createCluster(CLUSTER_TYPE); + } + + @Override + public void tearDownRunner() { + + } + + @Override + public void setUpTest() { + + } + + @Override + public void tearDownTest() { + + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java new file mode 100644 index 000000000..c4509ec01 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java @@ -0,0 +1,61 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.Cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import org.apache.commons.io.FileUtils; + +public interface IRcaItRunner { + default Cluster createCluster(ClusterType clusterType) throws IOException { + SimpleDateFormat sdf = new SimpleDateFormat(Consts.RCA_IT_CLUSTER_DIR_FORMAT); + Timestamp timestamp = new Timestamp(System.currentTimeMillis()); + String formattedTime = sdf.format(timestamp); + Path rcaItDir = Paths.get(Consts.RCA_IT_BASE_DIR, formattedTime); + + cleanUpDirsFromLastRuns(rcaItDir); + createITDirForThisRun(rcaItDir); + + return new Cluster(clusterType, rcaItDir.toFile(), false); + } + + default void cleanUpDirsFromLastRuns(Path oldDirs) throws IOException { + FileUtils.deleteDirectory(oldDirs.toFile()); + } + + default void createITDirForThisRun(Path rcaItDir) { + File clusterDir = rcaItDir.toFile(); + File parent = clusterDir.getParentFile(); + if (!clusterDir.exists() && !clusterDir.mkdirs()) { + throw new IllegalStateException("Couldn't create dir: " + parent); + } + } + + /** + * This does some setUp work for the cluster. It sets up the directory, initializes the + * required objects. + */ + void setUpRunner() throws IOException; + + /** + * This is executed after the runner has executed all the tests. + */ + void tearDownRunner(); + + /** + * In preparing for the next test, we make sure that the RCA scheduler is not + */ + void setUpTest(); + + /** + * This method does cleanup work after executing each test method. The cluster does not + * change between test methods and might exists between two Test classes if they happen to + * use the same runner but + */ + void tearDownTest(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/RcaItPoc.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/RcaItPoc.java new file mode 100644 index 000000000..adf620a17 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/RcaItPoc.java @@ -0,0 +1,77 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.INDEX_NAME_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.OPERATION_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARDID_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARD_ROLE_VALUE; + + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.TestApi; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.Table; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.Tuple; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners.DedicatedMasterWithHttpRunner; +import java.util.List; +import java.util.Map; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(DedicatedMasterWithHttpRunner.class) + +@ARcaGraph(SimpleAnalysisGraph.class) +@AMetric(name = "CPU_Utilization", + dimensionNames = {SHARDID_VALUE, INDEX_NAME_VALUE, OPERATION_VALUE, SHARD_ROLE_VALUE}, + tables = { + @Table(hostTag = HostTag.DATA_0, + tuple = { + @Tuple(dimensionValues = {"0", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 0.0), + @Tuple(dimensionValues = {"1", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 80.0), + @Tuple(dimensionValues = {"2", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 10.0) + } + ), + @Table(hostTag = {HostTag.DATA_1, HostTag.DATA_2}, + tuple = { + @Tuple(dimensionValues = {"0", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 50.0), + @Tuple(dimensionValues = {"1", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 5.0), + @Tuple(dimensionValues = {"2", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 11.0) + } + ) + } +) + +public class RcaItPoc { + private TestApi api; + + @Test + public void simple() throws InterruptedException { + Thread.sleep(1000); + int i = 0; + while (i < 10) { + // System.out.println(api.getAllRcaData().toString()); + + for (Map.Entry>> entry: + api.getRecordsForAllTables().entrySet()) { + // System.out.println(entry); + //System.out.println( + // api.getRcaRestResponse(new HashMap<>(), HostTag.ELECTED_MASTER)); + } + + Thread.sleep(2000); + i++; + } + } + + public void setTestApi(final TestApi api) { + this.api = api; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/SimpleAnalysisGraph.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/SimpleAnalysisGraph.java new file mode 100644 index 000000000..21136f9b1 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/SimpleAnalysisGraph.java @@ -0,0 +1,190 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.INDEX_NAME; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.SHARD_ID; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_DATA_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_LOCUS; + + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.CPU_Utilization; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.ElasticSearchAnalysisGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.hotshard.IndexShardKey; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.jooq.Record; + +public class SimpleAnalysisGraph extends ElasticSearchAnalysisGraph { + + @Override + public void construct() { + CPU_Utilization cpuUtilization = new CPU_Utilization(1); + cpuUtilization.addTag(TAG_LOCUS, LOCUS_DATA_NODE); + addLeaf(cpuUtilization); + + NodeRca nodeRca = new NodeRca(cpuUtilization); + nodeRca.addTag(TAG_LOCUS, LOCUS_DATA_NODE); + nodeRca.addAllUpstreams(Arrays.asList(cpuUtilization)); + + ClusterRca clusterRca = new ClusterRca(nodeRca); + clusterRca.addTag(TAG_LOCUS, LOCUS_MASTER_NODE); + clusterRca.addAllUpstreams(Collections.singletonList(nodeRca)); + //clusterRca.addTag(TAG_AGGREGATE_UPSTREAM, LOCUS_DATA_NODE); + } + + public static class NodeRca extends Rca> { + private final CPU_Utilization cpuUtilization; + + public NodeRca(CPU_Utilization cpu_utilization) { + super(1); + this.cpuUtilization = cpu_utilization; + } + + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + final List flowUnitMessages = + args.getWireHopper().readFromWire(args.getNode()); + List> flowUnitList = new ArrayList<>(); + for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { + flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); + } + setFlowUnits(flowUnitList); + } + + @Override + public ResourceFlowUnit operate() { + double maxCpu = 0; + IndexShardKey indexShardKey = null; + for (MetricFlowUnit metricFlowUnit : cpuUtilization.getFlowUnits()) { + if (metricFlowUnit.getData() != null) { + // Go through all the entries and find out the shard with the highest CPU + // utilization. + for (Record record : metricFlowUnit.getData()) { + try { + String indexName = record.getValue(INDEX_NAME.toString(), String.class); + // System.out.println(record); + Integer shardId = record.getValue(SHARD_ID.toString(), Integer.class); + if (indexName != null && shardId != null) { + double usage = record.getValue(MetricsDB.MAX, Double.class); + if (usage > maxCpu) { + maxCpu = usage; + indexShardKey = IndexShardKey.buildIndexShardKey(record); + } + } + } catch (IllegalArgumentException ex) { + + } + } + } + } + InstanceDetails instanceDetails = getInstanceDetails(); + HotNodeSummary nodeSummary = new HotNodeSummary(instanceDetails.getInstanceId(), + instanceDetails.getInstanceIp()); + ResourceFlowUnit rfu; + if (indexShardKey != null) { + //System.out.println("NodeRca running on " + instanceDetails.getInstanceId()); + + HotShardSummary summary = new HotShardSummary( + indexShardKey.getIndexName(), + String.valueOf(indexShardKey.getShardId()), + instanceDetails.getInstanceId(), + 0); + summary.setcpuUtilization(maxCpu); + nodeSummary.appendNestedSummary(summary); + rfu = new ResourceFlowUnit<>( + System.currentTimeMillis(), + new ResourceContext(Resources.State.UNHEALTHY), + nodeSummary, + true); + + //System.out.println(rfu); + } else { + rfu = new ResourceFlowUnit<>(System.currentTimeMillis()); + } + return rfu; + } + } + + public static class ClusterRca extends Rca> { + private final NodeRca nodeRca; + + public ClusterRca(NodeRca nodeRca) { + super(1); + this.nodeRca = nodeRca; + } + + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + throw new IllegalArgumentException(name() + "'s generateFlowUnitListFromWire() should not " + + "be required."); + } + + // The cluster level RCA goes through all the nodeLevel summaries and then picks the node + // with the highest CPU and states which shard it is the highest for. + @Override + public ResourceFlowUnit operate() { + final List> resourceFlowUnits = nodeRca.getFlowUnits(); + HotClusterSummary summary = new HotClusterSummary( + getAllClusterInstances().size(), 1); + + String hotNodeId = ""; + String hotsNodeAddr = ""; + String hotShard = ""; + String hotShardIndex = ""; + double cpuUtilization = 0.0; + + for (final ResourceFlowUnit resourceFlowUnit : resourceFlowUnits) { + if (resourceFlowUnit.isEmpty()) { + continue; + } + HotNodeSummary nodeSummary = resourceFlowUnit.getSummary(); + HotShardSummary hotShardSummary = nodeSummary.getHotShardSummaryList().get(0); + double cpu = hotShardSummary.getCpuUtilization(); + if (cpu > cpuUtilization) { + hotNodeId = nodeSummary.getNodeID(); + hotsNodeAddr = nodeSummary.getHostAddress(); + hotShard = hotShardSummary.getShardId(); + hotShardIndex = hotShardSummary.getIndexName(); + cpuUtilization = cpu; + } + } + + + HotClusterSummary hotClusterSummary = new HotClusterSummary( + getAllClusterInstances().size(), 1); + HotNodeSummary hotNodeSummary = new HotNodeSummary(hotNodeId, hotsNodeAddr); + HotShardSummary hotShardSummary = + new HotShardSummary(hotShardIndex, hotShard, hotNodeId, 0); + hotShardSummary.setcpuUtilization(cpuUtilization); + hotNodeSummary.appendNestedSummary(hotShardSummary); + hotClusterSummary.appendNestedSummary(hotNodeSummary); + + ResourceFlowUnit rfu; + if (!hotNodeId.isEmpty()) { + rfu = new ResourceFlowUnit<>( + System.currentTimeMillis(), + new ResourceContext(Resources.State.UNHEALTHY), + hotClusterSummary, true); + } else { + rfu = new ResourceFlowUnit<>(System.currentTimeMillis()); + } + //System.out.println(rfu); + return rfu; + } + } +} diff --git a/src/test/resources/rca/rca_elected_master.conf b/src/test/resources/rca/rca_elected_master.conf index d6ddaefe7..674a4718f 100644 --- a/src/test/resources/rca/rca_elected_master.conf +++ b/src/test/resources/rca/rca_elected_master.conf @@ -40,7 +40,8 @@ // How often the sqlite file be repeated in seconds. This file contains RCAs and therefore rotating it too frequently // might not be as fruitful as there might not be any data. - "rotation-period-seconds": 21600 + "rotation-period-seconds": 21600, + "storage-file-retention-count": 5 }, "rca-config-settings": { diff --git a/src/test/resources/rca/rca_master.conf b/src/test/resources/rca/rca_master.conf index d645178bb..314130957 100644 --- a/src/test/resources/rca/rca_master.conf +++ b/src/test/resources/rca/rca_master.conf @@ -40,7 +40,8 @@ // How often the sqlite file be repeated in seconds. This file contains RCAs and therefore rotating it too frequently // might not be as fruitful as there might not be any data. - "rotation-period-seconds": 21600 + "rotation-period-seconds": 21600, + "storage-file-retention-count": 5 }, "rca-config-settings": {