diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7240c1f86..586c02d8f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -61,6 +61,11 @@ e.g. ./gradlew test --tests RCASchedulerTaskTests.getLocallyExecutableNodes ``` +### Adding your own tests +- You can add a unit tests using the junit framework +- There is also a mechanism to add integration tests for the RCA framework. For details, please + see [here](docs/rcait.md). + Before submitting the PR, we request that you also run ```shell script ./gradlew build diff --git a/docs/rcait.md b/docs/rcait.md new file mode 100644 index 000000000..c9582cd5c --- /dev/null +++ b/docs/rcait.md @@ -0,0 +1,97 @@ +# Rca Integration test framework + +## Scope +To be able to test scenarios where multiple RCA Schedulers are running on different hosts of a +cluster and be able to inject a custom RCA graph, specify the metrics that should flow through +the rca-dataflow-graph and then be able to test the results of the RCAs and decisions based on +the RCAs by either querying the rca.sqlite files on a particular host or by hitting the REST +endpoint on a particular host. + +## Out of Scope +This framework will not facilitate testing the PerformanceAnalyzer Reader component, writer +component and how PerformanceAnalyzer interacts with ElasticSearch. + +## How to write your own tests using this framework ? +The RCA-IT is composed of various annotatons that you can use to configure the +test environment you want your tests to run on. + +`__@RunWith(RcaItNotEncryptedRunner.class)__` + +The above specifies the runner for the junit test class and in this case, it says to junit +to offload it to one of the RCA-IT runners - _RcaItNotEncryptedRunner_. All RCA-IT tests must +use this annotation for them to be run by this integ test framework. + +`__@AClusterType(ClusterType.MULTI_NODE_CO_LOCATED_MASTER)__` + +This annotation tells the RCA-IT to use `a multi-node cluster with no dedicated master nodes +`. The kinds of clusters supported today are: `SINGLE_NODE`, `MULTI_NODE_CO_LOCATED_MASTER +` and `MULTI_NODE_DEDICATED_MASTER`. This is a required annotation and must be specified at + the class level. + +`__@ARcaGraph(MyRcaGraph.class)__` + +This helps us specify the Graph that we will be using for this test. It can be a graph that +exists or the one specially contrived for this test. + +`__@AMetric__` + +This helps us specify the metrics that will be pured over the RCA graph. It has multiple sub +-fields. +- _name_ : The metric we are filling in. The expected parameter is one of the metrics classes + in `com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics`. The + metrics class that you specify, should have a `static final` field called `NAME` (`CPU_Utilization`) + and that will be used to determine the metric table. +- _dimensionNames_ : For the dimension names for a metric, please refer to the docs +[here](https://opendistro.github.io/for-elasticsearch-docs/docs/pa/reference/). +- _tables_ : This specifies a table for the metric. The table should be a 5 second snapshot + of the metrics, similar to what exists in metricsdb files. The table is an array type + , therfore it gives you the flexibility of specifying a different metrics table for + different nodes in the cluster. This can be used to push different metrics to the node that + we want to be marked unhealthy vs all other nodes in the cluster. + - _hostTag_ : On which node of the cluster, should this metric table be emitted. + - _tuple_ : This is an array type that can be used to specify the rows in the table. A + row should be an n-tuple where n is the number of dimension this metrics has added to + the 4 aggregate columns that all metricsdb files has - `min`, `max`, `sum` and `avg`. + +`__@Expect__` + +This is an optional annotation that can be used only at a method level. This provides an easy +way to validate the result of the test. The annotation has 4 sub-fields: +- what : What are we testing for - data in rca.sqlite file or the response returned by the + rest API. +- on : On which node should the framework look for, for the expected data. +- forRca : Which particular RCA's value are we testing. +- validator : This is the class that should be defined by the test writer and should + implement `IValidator` interface. Once the framework gathers the data for the mentioned RCA + from the given node, the data will be passed to the validator which returns if the check + passed or not. + +The Expect annotation is a repeatable type. Therefore, you can expect multiple things from +the test at steady-state. So you can have two expectations one for the RCA on data node and +the other on the master. If the expectations are not true for the ith iteration, then the +framework, will re-run them for the i+1 the iteration till a timeout. The timeout is +configurable to any number of seconds using the field `timeoutSeconds` but has the default +of 60 seconds. + +A test class can get access to the programmaticAPI to get information about hosts in the cluster +or a particular host then the test class can declare a method with name `setTestApi(final TestApi api)` +and the test runer will call this setter to give a reference of the TEestApi to the testClass. + +## Framework deep dive. +This section might be of interest to you if you are trying to enhance the test framework itself +. If you just want to add more integration tests, then you may choose to skip this section. + +The framework consists of four main classes: +1. `RcaItRunnerBase` : This is the JUnit Runner that will be used to run all rca-it tests. It + orchestrates the environment creation, initializing the test class and then executing the methods + annotated with `@Test`. + +2. `TestEnvironment` : The RCA-IT environment is defined by the RCA graph it will be running, the + metrics that will flow through the dataflow pipelines and the rca.conf that will be used by the + hosts. + +3. `Cluster` and `Host` classes: These class initializes multiple RCAController(s) threads, +each of them represent RCAFramework running on multiple nodes. In constructors, we create all the +objects and create a directory per host where they will dump the _rca.sqlite_ and _rca_enabled_ +files. In the second phase a call to `createServersAndThreads` is made which creates all the http +and grpc servers (one per host). Then we start the RCAcontroller thread. diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java index 448fec44c..a9f37d8d9 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java @@ -28,6 +28,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetClient; import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.JvmMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics; @@ -136,14 +137,21 @@ private static void startRcaTopLevelThread(final ClientServers clientServers, Util.DATA_DIR, RcaConsts.RCA_STATE_CHECK_INTERVAL_IN_MS, RcaConsts.nodeRolePollerPeriodicityInSeconds * 1000, - appContext + appContext, new MetricsDBProvider() ); startRcaTopLevelThread(rcaController, threadProvider); } - public static Thread startRcaTopLevelThread(final RcaController rcaController1, final ThreadProvider threadProvider) { + public static Thread startRcaTopLevelThread(final RcaController rcaController1, + final ThreadProvider threadProvider) { + return startRcaTopLevelThread(rcaController1, threadProvider, ""); + } + + public static Thread startRcaTopLevelThread(final RcaController rcaController1, + final ThreadProvider threadProvider, + String nodeName) { Thread rcaControllerThread = threadProvider.createThreadForRunnable(() -> rcaController1.run(), - PerformanceAnalyzerThreads.RCA_CONTROLLER); + PerformanceAnalyzerThreads.RCA_CONTROLLER, nodeName); rcaControllerThread.start(); return rcaControllerThread; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java index 19119d39e..2b2e66c89 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerWebServer.java @@ -63,6 +63,7 @@ public static HttpServer createInternalServer(String portFromSetting, String hos server.setExecutor(Executors.newCachedThreadPool()); return server; } catch (java.net.BindException ex) { + LOG.error("Could not create HttpServer on port {}", internalPort, ex); Runtime.getRuntime().halt(1); } catch (Exception ex) { ex.printStackTrace(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java index e4e425462..30df50680 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java @@ -64,6 +64,7 @@ import java.util.Objects; import java.util.Scanner; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -88,6 +89,8 @@ public class RcaController { private boolean rcaEnabledDefaultValue = false; + private final int WAIT_FOR_SCHED_START_SECS = 10; + // This needs to be volatile as the RcaConfPoller writes it but the Nanny reads it. private volatile boolean rcaEnabled = false; @@ -96,6 +99,7 @@ public class RcaController { // This needs to be volatile as the NodeRolePoller writes it but the Nanny reads it. protected volatile NodeRole currentRole = NodeRole.UNKNOWN; + private volatile List connectedComponents; private final ThreadProvider threadProvider; private RCAScheduler rcaScheduler; @@ -124,7 +128,9 @@ public class RcaController { private final AppContext appContext; - protected Queryable dbProvider = null; + protected volatile Queryable dbProvider = null; + + private volatile Persistable persistenceProvider; public RcaController( final ThreadProvider threadProvider, @@ -134,7 +140,8 @@ public RcaController( final String rca_enabled_conf_location, final long rcaStateCheckIntervalMillis, final long nodeRoleCheckPeriodicityMillis, - final AppContext appContext) { + final AppContext appContext, + final Queryable dbProvider) { this.threadProvider = threadProvider; this.appContext = appContext; this.netOpsExecutorService = netOpsExecutorService; @@ -151,6 +158,9 @@ public RcaController( this.rcaStateCheckIntervalMillis = rcaStateCheckIntervalMillis; this.roleCheckPeriodicity = nodeRoleCheckPeriodicityMillis; this.deliberateInterrupt = false; + this.connectedComponents = null; + this.dbProvider = dbProvider; + this.persistenceProvider = null; } @VisibleForTesting @@ -162,6 +172,7 @@ public RcaController() { rcaStateCheckIntervalMillis = 0; roleCheckPeriodicity = 0; appContext = null; + this.persistenceProvider = null; } protected List getRcaGraphComponents( @@ -177,25 +188,22 @@ private void start() { try { Objects.requireNonNull(subscriptionManager); Objects.requireNonNull(rcaConf); + if (dbProvider == null) { + return; + } subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus")); - List connectedComponents = getRcaGraphComponents(rcaConf); + this.connectedComponents = getRcaGraphComponents(rcaConf); // Mute the rca nodes after the graph creation and before the scheduler start readAndUpdateMutedComponentsDuringStart(); - Queryable db; - if (dbProvider == null) { - db = new MetricsDBProvider(); - } else { - db = dbProvider; - } ThresholdMain thresholdMain = new ThresholdMain(RcaConsts.THRESHOLDS_PATH, rcaConf); - Persistable persistable = PersistenceFactory.create(rcaConf); + persistenceProvider = PersistenceFactory.create(rcaConf); networkThreadPoolReference .set(RcaControllerHelper.buildNetworkThreadPool(rcaConf.getNetworkQueueLength())); addRcaRequestHandler(); - queryRcaRequestHandler.setPersistable(persistable); + queryRcaRequestHandler.setPersistable(persistenceProvider); receivedFlowUnitStore = new ReceivedFlowUnitStore(rcaConf.getPerVertexBufferLength()); WireHopper net = new WireHopper(nodeStateManager, rcaNetClient, subscriptionManager, @@ -208,10 +216,10 @@ private void start() { this.rcaScheduler = new RCAScheduler(connectedComponents, - db, + dbProvider, rcaConf, thresholdMain, - persistable, + persistenceProvider, net, copyAppContext); @@ -221,9 +229,21 @@ private void start() { new SubscribeServerHandler(subscriptionManager, networkThreadPoolReference)); Thread rcaSchedulerThread = threadProvider.createThreadForRunnable(() -> rcaScheduler.start(), - PerformanceAnalyzerThreads.RCA_SCHEDULER); + PerformanceAnalyzerThreads.RCA_SCHEDULER, + copyAppContext.getMyInstanceDetails().getInstanceId().toString()); + CountDownLatch schedulerStartLatch = new CountDownLatch(1); + rcaScheduler.setSchedulerTrackingLatch(schedulerStartLatch); rcaSchedulerThread.start(); + if (!schedulerStartLatch.await(WAIT_FOR_SCHED_START_SECS, TimeUnit.SECONDS)) { + LOG.error("Failed to start RcaScheduler."); + throw new IllegalStateException( + "Failed to start RcaScheduler within " + WAIT_FOR_SCHED_START_SECS + " seconds."); + } + + if (rcaScheduler.getState() != RcaSchedulerState.STATE_STARTED) { + LOG.error("RCA scheduler didn't start within {} seconds", WAIT_FOR_SCHED_START_SECS); + } } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException @@ -232,8 +252,9 @@ private void start() { | MalformedConfig | SQLException | IOException e) { - LOG.error("Couldn't build connected components or persistable.. Ran into {}", e.getMessage()); - e.printStackTrace(); + LOG.error("Couldn't build connected components or persistable..", e); + } catch (Exception ex) { + LOG.error("Couldn't start RcaController", ex); } } @@ -262,6 +283,10 @@ private void restart() { StatsCollector.instance().logMetric(RcaConsts.RCA_SCHEDULER_RESTART_METRIC); } + protected RcaConf getRcaConfForMyRole(NodeRole role) { + return RcaControllerHelper.pickRcaConfForRole(role); + } + public void run() { long tick = 0; long nodeRoleCheckInTicks = roleCheckPeriodicity / rcaStateCheckIntervalMillis; @@ -279,7 +304,7 @@ public void run() { // If RCA is enabled, update Analysis graph with Muted RCAs value if (rcaEnabled) { - rcaConf = RcaControllerHelper.pickRcaConfForRole(currentRole); + rcaConf = getRcaConfForMyRole(currentRole); LOG.debug("Updating Analysis Graph with Muted RCAs"); readAndUpdateMutedComponents(); } @@ -290,14 +315,17 @@ public void run() { Thread.sleep(rcaStateCheckIntervalMillis - duration); } } catch (InterruptedException ie) { - if (!deliberateInterrupt) { - LOG.error("RCA controller thread was interrupted. Reason: {}", ie.getMessage()); - LOG.error(ie); + if (deliberateInterrupt) { + // This should only happen in case of tests. So, its okay for this log level to be info. + LOG.info("RcaController thread interrupted.."); + } else { + LOG.error("RCA controller thread was interrupted.", ie); } break; } tick++; } + LOG.error("RcaController exits.."); } /** @@ -310,7 +338,12 @@ private void readRcaEnabledFromConf() { () -> { try (Scanner sc = new Scanner(filePath)) { String nextLine = sc.nextLine(); - rcaEnabled = Boolean.parseBoolean(nextLine); + boolean oldVal = rcaEnabled; + boolean newVal = Boolean.parseBoolean(nextLine); + if (oldVal != newVal) { + rcaEnabled = newVal; + LOG.info("RCA enabled changed from {} to {}", oldVal, newVal); + } } catch (IOException e) { LOG.error("Error reading file '{}': {}", filePath.toString(), e); e.printStackTrace(); @@ -336,7 +369,8 @@ private void readAndUpdateMutedComponentsDuringStart() { private boolean updateMutedComponents() { try { - if (ConnectedComponent.getNodeNames().isEmpty()) { + Set allNodes = ConnectedComponent.getNodesForAllComponents(this.connectedComponents); + if (allNodes.isEmpty()) { LOG.info("Analysis graph not initialized/has been reset; returning."); return false; } @@ -350,7 +384,7 @@ private boolean updateMutedComponents() { LOG.info("Actions provided for muting: {}", actionsForMute); // Update rcasForMute to retain only valid RCAs - graphNodesForMute.retainAll(ConnectedComponent.getNodeNames()); + graphNodesForMute.retainAll(allNodes); // If rcasForMute post validation is empty but neither rcaConf.getMutedRcaList() nor // rcaConf.getMutedDeciderList() are empty all the input RCAs/deciders are incorrect. @@ -359,11 +393,11 @@ private boolean updateMutedComponents() { if (lastModifiedTimeInMillisInMemory == 0) { LOG.error( "Removing Incorrect RCA(s): {} provided before RCA Scheduler start. Valid RCAs: {}.", - rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames()); + rcaConf.getMutedRcaList(), allNodes); } else { LOG.error("Incorrect RCA(s): {}, cannot be muted. Valid RCAs: {}, Muted RCAs: {}", - rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames(), + rcaConf.getMutedRcaList(), allNodes, Stats.getInstance().getMutedGraphNodes()); return false; } @@ -480,4 +514,13 @@ public void setDbProvider(Queryable dbProvider) { this.dbProvider = dbProvider; } + @VisibleForTesting + public List getConnectedComponents() { + return connectedComponents; + } + + @VisibleForTesting + public Persistable getPersistenceProvider() { + return persistenceProvider; + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java index 1d46fcff1..0b0b77fe7 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConfJsonWrapper.java @@ -28,8 +28,22 @@ // TODO: There should be a validation for the expected fields. @JsonIgnoreProperties(ignoreUnknown = true) class ConfJsonWrapper { + public static final String RCA_STORE_LOC = "rca-store-location"; + public static final String THRESHOLD_STORE_LOC = "threshold-store-location"; + public static final String NEW_RCA_CHECK_MINS = "new-rca-check-minutes"; + public static final String NEW_THRESHOLDS_CHECK_MINS = "new-threshold-check-minutes"; + public static final String TAGS = "tags"; + public static final String REMOTE_PEERS = "remote-peers"; + public static final String DATASTORE = "datastore"; + public static final String ANALYSIS_GRAPH_IMPL = "analysis-graph-implementor"; + public static final String NETWORK_QUEUE_LEN = "network-queue-length"; + public static final String MAX_FLOW_UNIT_PER_VERTEX = "max-flow-units-per-vertex-buffer"; + public static final String RCA_CONFIG_SETTINGS = "rca-config-settings"; + public static final String MUTED_RCAS = "muted-rcas"; + public static final String MUTED_DECIDERS = "muted-deciders"; + public static final String MUTED_ACTIONS = "muted-actions"; + public static final String DECIDER_CONFIG_SETTINGS = "decider-config-settings"; - private static final Logger LOG = LogManager.getLogger(ConfJsonWrapper.class); private final String rcaStoreLoc; private final String thresholdStoreLoc; private final long newRcaCheckPeriodicityMins; @@ -112,21 +126,21 @@ Map getDeciderConfigSettings() { } ConfJsonWrapper( - @JsonProperty("rca-store-location") String rcaStoreLoc, - @JsonProperty("threshold-store-location") String thresholdStoreLoc, - @JsonProperty("new-rca-check-minutes") long newRcaCheckPeriodicityMins, - @JsonProperty("new-threshold-check-minutes") long newThresholdCheckPeriodicityMins, - @JsonProperty("tags") Map tags, - @JsonProperty("remote-peers") List peers, - @JsonProperty("datastore") Map datastore, - @JsonProperty("analysis-graph-implementor") String analysisGraphEntryPoint, - @JsonProperty("network-queue-length") int networkQueueLength, - @JsonProperty("max-flow-units-per-vertex-buffer") int perVertexBufferLength, - @JsonProperty("rca-config-settings") Map rcaConfigSettings, - @JsonProperty("muted-rcas") List mutedRcas, - @JsonProperty("muted-deciders") List mutedDeciders, - @JsonProperty("muted-actions") List mutedActions, - @JsonProperty("decider-config-settings") Map deciderConfigSettings) { + @JsonProperty(RCA_STORE_LOC) String rcaStoreLoc, + @JsonProperty(THRESHOLD_STORE_LOC) String thresholdStoreLoc, + @JsonProperty(NEW_RCA_CHECK_MINS) long newRcaCheckPeriodicityMins, + @JsonProperty(NEW_THRESHOLDS_CHECK_MINS) long newThresholdCheckPeriodicityMins, + @JsonProperty(TAGS) Map tags, + @JsonProperty(REMOTE_PEERS) List peers, + @JsonProperty(DATASTORE) Map datastore, + @JsonProperty(ANALYSIS_GRAPH_IMPL) String analysisGraphEntryPoint, + @JsonProperty(NETWORK_QUEUE_LEN) int networkQueueLength, + @JsonProperty(MAX_FLOW_UNIT_PER_VERTEX) int perVertexBufferLength, + @JsonProperty(RCA_CONFIG_SETTINGS) Map rcaConfigSettings, + @JsonProperty(MUTED_RCAS) List mutedRcas, + @JsonProperty(MUTED_DECIDERS) List mutedDeciders, + @JsonProperty(MUTED_ACTIONS) List mutedActions, + @JsonProperty(DECIDER_CONFIG_SETTINGS) Map deciderConfigSettings) { this.creationTime = System.currentTimeMillis(); this.rcaStoreLoc = rcaStoreLoc; this.thresholdStoreLoc = thresholdStoreLoc; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java index 016bf117b..6f0a65d7a 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/ConnectedComponent.java @@ -29,7 +29,7 @@ public class ConnectedComponent { /* The elements in the inner list can be executed in parallel. Two inner lists have to be executed in order. */ private List>> dependencyOrderedNodes; - private static Set nodeNames = new HashSet<>(); + private Set nodeNames = new HashSet<>(); private int graphId; public Set> getAllNodes() { @@ -117,8 +117,18 @@ public List>> getAllNodesByDependencyOrder() { return dependencyOrderedNodes; } - public static Set getNodeNames() { + public Set getNodeNames() { return nodeNames; } + public static Set getNodesForAllComponents(List connectedComponentList) { + Set allNodes = new HashSet<>(); + + if (connectedComponentList != null) { + for (ConnectedComponent connectedComponent : connectedComponentList) { + allNodes.addAll(connectedComponent.getNodeNames()); + } + } + return allNodes; + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java index 35ea40b4d..a76a2c7ae 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java @@ -135,8 +135,9 @@ protected synchronized Path rotate(long currentMillis) throws IOException { // try 4: If the delete fails, all bets are off, throw an exception and let the caller decide. try { ret = Files.move(FILE_TO_ROTATE, targetFilePath, StandardCopyOption.ATOMIC_MOVE); + LOG.info("## File rotated successfully to : {}", targetFilePath); } catch (FileAlreadyExistsException fae) { - LOG.error("Deleting file '{}' or else we cannot rotate the current {}", targetFilePath, FILE_TO_ROTATE); + LOG.error("**Deleting file '{}' or else we cannot rotate the current {}", targetFilePath, FILE_TO_ROTATE); if (!Files.deleteIfExists(targetFilePath)) { LOG.error("Could not delete file: " + targetFilePath); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java index 8293c68ca..bc50e6470 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/Persistable.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import org.jooq.Record; +import org.jooq.Result; public interface Persistable { /** @@ -54,4 +56,10 @@ public interface Persistable { void write(Node node, T flowUnit) throws SQLException, IOException; void close() throws SQLException; + + /** + * Get a list of all the distinct RCAs persisted in the current DB file. + * @return A list of RCAs. + */ + List getAllPersistedRcas(); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java index aa6614913..b659aaa3c 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java @@ -37,6 +37,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Result; import org.jooq.exception.DataAccessException; // TODO: Scheme to rotate the current file and garbage collect older files. @@ -113,6 +115,8 @@ abstract void createTable( abstract void createNewDSLContext(); + public abstract List> getRecordsForAllTables(); + // Not required for now. @Override public synchronized List read(Node node) { @@ -146,7 +150,6 @@ private synchronized void openNewDBFile() throws SQLException { String url = String.format("%s%s", this.dbProtocol, this.filename); close(); conn = DriverManager.getConnection(url); - LOG.info("RCA: Periodic File Rotation - Created a new database connection - " + url); createNewDSLContext(); } @@ -201,6 +204,7 @@ private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type // the current DBFile does not exist anymore. We therefore should create a new one. if (fileRotate.getLastRotatedMillis() == currTime) { openNewDBFile(); + LOG.info("Created a new DB file."); } } @@ -214,19 +218,19 @@ private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type * corrupted. * @throws IOException This is thrown if the attempt to create a new DB file fails. */ - private void writeFlowUnit( + private synchronized void writeFlowUnit( T flowUnit, String tableName) throws SQLException, IOException { try { tryWriteFlowUnit(flowUnit, tableName); } catch (SQLException | DataAccessException e) { LOG.info( - "RCA: Fail to write to table '{}', creating a new DB file and retrying write/create operation", tableName); + "RCA: Fail to write to table '{}', creating a new DB file and retrying write/create operation", tableName, e); rotateRegisterGarbageThenCreateNewDB(RotationType.FORCE_ROTATE); tryWriteFlowUnit(flowUnit, tableName); } } - private void tryWriteFlowUnit( + private synchronized void tryWriteFlowUnit( T flowUnit, String nodeName) throws SQLException, DataAccessException { String tableName = ResourceFlowUnit.RCA_TABLE_NAME; if (!tableNames.contains(tableName)) { @@ -245,14 +249,14 @@ private void tryWriteFlowUnit( } /** recursively insert nested summary to sql tables */ - private void writeSummary( + private synchronized void writeSummary( GenericSummary summary, String referenceTable, String referenceTablePrimaryKeyFieldName, int referenceTablePrimaryKeyFieldValue) throws SQLException { String tableName = summary.getClass().getSimpleName(); if (!tableNames.contains(tableName)) { - LOG.info("RCA: Table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema()); + LOG.info("RCA: Summary table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema()); createTable(tableName, summary.getSqlSchema(), referenceTable, referenceTablePrimaryKeyFieldName); } List values = summary.getSqlValue(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java index e154bd1d9..479610882 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java @@ -33,12 +33,18 @@ import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.CreateTableConstraintStep; @@ -47,6 +53,7 @@ import org.jooq.InsertValuesStepN; import org.jooq.JSONFormat; import org.jooq.Record; +import org.jooq.Record1; import org.jooq.Result; import org.jooq.SQLDialect; import org.jooq.SelectJoinStep; @@ -75,6 +82,9 @@ class SQLitePersistor extends PersistorBase { // It is needed during SQLite file rotation @Override synchronized void createNewDSLContext() { + if (create != null) { + create.close(); + } create = DSL.using(super.conn, SQLDialect.SQLITE); jooqTableColumns = new HashMap<>(); } @@ -187,6 +197,28 @@ synchronized String readTables() { return tablesObject.toString(); } + public synchronized List> getRecordsForAllTables() { + List> results = new ArrayList<>(); + super.tableNames.forEach( + table -> results.add(getRecords(table)) + ); + return results; + } + + @Override + public synchronized List getAllPersistedRcas() { + List uniquePersistedRcas = new ArrayList<>(); + try { + uniquePersistedRcas = + (List) create.selectDistinct(ResourceFlowUnitFieldValue.RCA_NAME_FILELD.getField()) + .from(ResourceFlowUnit.RCA_TABLE_NAME) + .fetch(0).stream().collect(Collectors.toList()); + } catch (DataAccessException dex) { + + } + return uniquePersistedRcas; + } + //read table content and convert it into JSON format private synchronized String readTable(String tableName) { String tableStr; @@ -208,6 +240,24 @@ private synchronized String readTable(String tableName) { return tableStr; } + private synchronized @Nullable Result getRecords(String tableName) { + try { + Result result; + if (tableName.equals(ResourceFlowUnit.RCA_TABLE_NAME)) { + result = create.select() + .from(tableName) + .orderBy(ResourceFlowUnitFieldValue.RCA_NAME_FILELD.getField()) + .fetch(); + } else { + result = create.select().from(tableName).fetch(); + } + return result; + } catch (DataAccessException e) { + LOG.error("Fail to read table {}", tableName); + } + return null; + } + /** * FullTemperatureSummary is not one single Rca, instead it is a conglomeration of * temperature across all dimensions. Therefore, it iterates over all the dimensional tables @@ -286,8 +336,7 @@ private synchronized void readSummary(GenericSummary upperLevelSummary, int uppe } } catch (DataAccessException de) { // it is totally fine if we fail to read some certain tables as some types of summaries might be missing - LOG.warn("Fail to read Summary table : {}, query = {}, exceptions : {}", - nestedTableName, rcaQuery.toString(), de.getStackTrace()); + LOG.warn("Fail to read Summary table : {}, query = {}", nestedTableName, rcaQuery.toString(), de); } catch (IllegalArgumentException ie) { LOG.error("Reading nested summary from wrong table, message : {}", ie.getMessage()); } @@ -321,8 +370,10 @@ private synchronized JsonElement getNonTemperatureRcas(String rca) { } } } catch (DataAccessException de) { - // it is totally fine if we fail to read some certain tables. - LOG.warn("Fail to read RCA : {}, query = {}, exceptions : {}", rca, rcaQuery.toString(), de.getStackTrace()); + if (!de.getMessage().contains("no such table")) { + // it is totally fine if we fail to read some certain tables. + LOG.error("Fail to read RCA : {}.", rca, de); + } } JsonElement ret = null; if (response != null) { diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java index 423518e23..77b9bf576 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -54,18 +55,16 @@ public class RCAScheduler { private final NodeRole role; private final AppContext appContext; - final ThreadFactory schedThreadFactory = - new ThreadFactoryBuilder().setNameFormat("sched-%d").setDaemon(true).build(); + final ThreadFactory schedThreadFactory; // TODO: Fix number of threads based on config. - final ThreadFactory taskThreadFactory = - new ThreadFactoryBuilder().setNameFormat("task-%d-").setDaemon(true).build(); + final ThreadFactory taskThreadFactory; ExecutorService rcaSchedulerPeriodicExecutor; ScheduledExecutorService scheduledPool; List connectedComponents; - Queryable db; + volatile Queryable db; RcaConf rcaConf; ThresholdMain thresholdMain; Persistable persistable; @@ -74,6 +73,8 @@ public class RCAScheduler { private static final Logger LOG = LogManager.getLogger(RCAScheduler.class); + private CountDownLatch schedulerTrackingLatch; + public RCAScheduler( List connectedComponents, Queryable db, @@ -82,6 +83,18 @@ public RCAScheduler( Persistable persistable, WireHopper net, final AppContext appContext) { + String instanceId = appContext.getMyInstanceDetails().getInstanceId().toString(); + this.schedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(instanceId + "-sched-%d") + .setDaemon(true) + .build(); + + // TODO: Fix number of threads based on config. + this.taskThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(instanceId + "-task-%d-") + .setDaemon(true) + .build(); + this.connectedComponents = connectedComponents; this.db = db; this.rcaConf = rcaConf; @@ -101,10 +114,16 @@ public void start() { if (scheduledPool == null) { LOG.error("Couldn't start RCA scheduler. Executor pool is not set."); + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } return; } if (role == NodeRole.UNKNOWN) { LOG.error("Couldn't start RCA scheduler as the node role is UNKNOWN."); + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } return; } @@ -119,7 +138,10 @@ public void start() { appContext); schedulerState = RcaSchedulerState.STATE_STARTED; - LOG.info("RCA scheduler thread started successfully."); + LOG.info("RCA scheduler thread started successfully on node: {}", appContext.getMyInstanceDetails().getInstanceId()); + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } while (schedulerState == RcaSchedulerState.STATE_STARTED) { try { @@ -149,6 +171,7 @@ public void shutdown() { LOG.info("Shutting down the scheduler.."); shutdownRequested = true; scheduledPool.shutdown(); + waitForShutdown(scheduledPool); rcaSchedulerPeriodicExecutor.shutdown(); waitForShutdown(rcaSchedulerPeriodicExecutor); try { @@ -158,6 +181,9 @@ public void shutdown() { "RCA: Error while closing the DB connection: {}::{}", e.getErrorCode(), e.getCause()); } schedulerState = RcaSchedulerState.STATE_STOPPED; + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } } private void waitForShutdown(ExecutorService execPool) { @@ -184,6 +210,10 @@ public NodeRole getRole() { return role; } + public void setSchedulerTrackingLatch(final CountDownLatch schedulerTrackingLatch) { + this.schedulerTrackingLatch = schedulerTrackingLatch; + } + @VisibleForTesting public void setQueryable(Queryable queryable) { this.db = queryable; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessor.java index 4dddfd1f1..c945a7180 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessor.java @@ -186,14 +186,6 @@ public NodeDetails(AllMetrics.NodeRole role, String id, String hostAddress, bool this(role, id, hostAddress, isMaster, Util.RPC_PORT); } - public NodeDetails(AllMetrics.NodeRole role, String id, String hostAddress, boolean isMaster, int grpcPort) { - this.id = id; - this.hostAddress = hostAddress; - this.isMasterNode = isMaster; - this.role = role.toString(); - this.grpcPort = grpcPort; - } - public NodeDetails(final NodeDetails other) { if (other != null) { this.id = other.id; @@ -203,6 +195,14 @@ public NodeDetails(final NodeDetails other) { } } + public NodeDetails(AllMetrics.NodeRole role, String id, String hostAddress, boolean isMaster, int grpcPort) { + this.role = role.toString(); + this.id = id; + this.hostAddress = hostAddress; + this.isMasterNode = isMaster; + this.grpcPort = grpcPort; + } + @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java index 567f62eb6..5027495ab 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rest/QueryRcaRequestHandler.java @@ -162,13 +162,20 @@ public void handle(HttpExchange exchange) throws IOException { private void handleClusterRcaRequest(Map params, HttpExchange exchange) throws IOException { + //check if we are querying from elected master + if (!validNodeRole()) { + JsonObject errResponse = new JsonObject(); + errResponse.addProperty("error", "Node being queried is not elected master."); + sendResponse(exchange, errResponse.toString(), + HttpURLConnection.HTTP_BAD_REQUEST); + return; + } List rcaList = metricsRestUtil.parseArrayParam(params, NAME_PARAM, true); // query all cluster level RCAs if no RCA is specified in name. if (rcaList.isEmpty()) { - rcaList = SQLiteQueryUtils.getClusterLevelRca(); - } - //check if RCA is valid - if (!validParams(rcaList)) { + // rcaList = SQLiteQueryUtils.getClusterLevelRca(); + rcaList = persistable.getAllPersistedRcas(); + } else if (!validParams(rcaList)) { JsonObject errResponse = new JsonObject(); JsonArray errReason = new JsonArray(); SQLiteQueryUtils.getClusterLevelRca().forEach(errReason::add); @@ -178,14 +185,6 @@ private void handleClusterRcaRequest(Map params, HttpExchange ex HttpURLConnection.HTTP_BAD_REQUEST); return; } - //check if we are querying from elected master - if (!validNodeRole()) { - JsonObject errResponse = new JsonObject(); - errResponse.addProperty("error", "Node being queried is not elected master."); - sendResponse(exchange, errResponse.toString(), - HttpURLConnection.HTTP_BAD_REQUEST); - return; - } String response = getRcaData(persistable, rcaList).toString(); sendResponse(exchange, response, HttpURLConnection.HTTP_OK); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java index 0cb36181b..21e3b20aa 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java @@ -41,7 +41,13 @@ public class ThreadProvider { * @return The thread with the wrapped runnable. */ public Thread createThreadForRunnable(final Runnable innerRunnable, - final PerformanceAnalyzerThreads paThread) { + final PerformanceAnalyzerThreads paThread, String threadNameAppender) { + StringBuilder threadName = new StringBuilder(paThread.toString()); + if (!threadNameAppender.isEmpty()) { + threadName.append("-").append(threadNameAppender); + } + String threadNameStr = threadName.toString(); + Thread t = new Thread(() -> { try { innerRunnable.run(); @@ -56,11 +62,16 @@ public Thread createThreadForRunnable(final Runnable innerRunnable, } } StatsCollector.instance().logMetric(PA_THREADS_ENDED_METRIC_NAME); - LOG.info("Thread: {} completed.", paThread.toString()); - }, paThread.toString()); + LOG.info("Thread: {} completed.", threadNameStr); + }, threadNameStr); - LOG.info("Spun up a thread with name: {}", paThread.toString()); + LOG.info("Spun up a thread with name: {}", threadNameStr); StatsCollector.instance().logMetric(PA_THREADS_STARTED_METRIC_NAME); return t; } + + public Thread createThreadForRunnable(final Runnable innerRunnable, + final PerformanceAnalyzerThreads paThread) { + return createThreadForRunnable(innerRunnable, paThread, ""); + } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java index 9d293c463..22515ba66 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaControllerTest.java @@ -34,7 +34,6 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; @@ -47,7 +46,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.powermock.reflect.Whitebox; @Category(GradleTaskForRca.class) public class RcaControllerTest { @@ -129,7 +127,8 @@ public void setUp() throws Exception { rcaEnabledFileLoc.toString(), 100, 200, - appContext + appContext, + new MetricsDBProviderTestHelper() ); rcaController.setDbProvider(new MetricsDBProviderTestHelper()); @@ -203,14 +202,14 @@ public void readAndUpdateMutedRcasBeforeGraphCreation() throws Exception { Field mutedGraphNodesField = Stats.class.getDeclaredField("mutedGraphNodes"); mutedGraphNodesField.setAccessible(true); mutedGraphNodesField.set(Stats.getInstance(), null); - Set initialComponentSet = ConnectedComponent.getNodeNames(); - Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", new HashSet<>()); + Set initialComponentSet = ConnectedComponent.getNodesForAllComponents(rcaController.getConnectedComponents()); + // Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", new HashSet<>()); readAndUpdateMutesRcas.invoke(rcaController); Assert.assertNull(Stats.getInstance().getMutedGraphNodes()); // Re-set back to initialComponentSet - Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", initialComponentSet); + // Whitebox.setInternalState(ConnectedComponent.class, "nodeNames", initialComponentSet); } @Test diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java new file mode 100644 index 000000000..4558ac412 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java @@ -0,0 +1,286 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerWebServer; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.core.Util; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.exceptions.PAThreadException; +import com.google.gson.JsonElement; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.commons.io.FileUtils; + +public class Cluster { + // A cluster can have 0 (single node) to 5 (multi node with dedicated masters) hosts. The following three + // maps specify what each host will be tagged as. + private static final Map hostIdToHostTagMapForDedicatedMaster = + new HashMap() {{ + put(0, HostTag.ELECTED_MASTER); + put(1, HostTag.STANDBY_MASTER_0); + put(2, HostTag.STANDBY_MASTER_1); + put(3, HostTag.DATA_0); + put(4, HostTag.DATA_1); + }}; + + private static final Map hostIdToHostTagMapCoLocatedMaster = + new HashMap() {{ + put(0, HostTag.ELECTED_MASTER); + put(1, HostTag.DATA_0); + }}; + + private static final HostTag hostIdToHostTagMapSingleNode = HostTag.DATA_0; + + // A queue where exceptions thrown by all the hosts will land. + private final BlockingQueue exceptionQueue; + private final boolean useHttps; + private final ClusterType clusterType; + + // The list of all the hosts in the cluster. + private final List hostList; + + // The top level directory that will be used by all the hosts for this iteration of the test. + private final File clusterDir; + + // Same as the thread provide object in PerformanceAnalyzerApp.java + private final ThreadProvider threadProvider; + + // If you want to get all the hosts that are assigned with role, say data_node, then this is the map + // to query. + private final Map> roleToHostMap; + + private final boolean rcaEnabled; + private Thread errorHandlingThread; + + // To get a host by a tag. Its the reverse mapping from what the top three maps contain. + private final Map tagToHostMapping; + + /** + * @param type The type of cluster - can be dedicated master, colocated master or single node. + * @param clusterDir The directory that will be used by the cluster for files. + * @param useHttps Should the http and grpc connections use https. + */ + public Cluster(final ClusterType type, final File clusterDir, final boolean useHttps) { + this.clusterType = type; + this.hostList = new ArrayList<>(); + this.roleToHostMap = new HashMap<>(); + this.clusterDir = clusterDir; + // We start off with the RCA turned off and turn it on only right before we + // invoke a test method. + this.rcaEnabled = false; + this.useHttps = useHttps; + this.threadProvider = new ThreadProvider(); + this.exceptionQueue = new ArrayBlockingQueue<>(1); + this.tagToHostMapping = new HashMap<>(); + + switch (type) { + case SINGLE_NODE: + createSingleNodeCluster(); + break; + case MULTI_NODE_CO_LOCATED_MASTER: + createMultiNodeCoLocatedMaster(); + break; + case MULTI_NODE_DEDICATED_MASTER: + createMultiNodeDedicatedMaster(); + break; + } + + for (Host host : hostList) { + host.setClusterDetails(hostList); + } + } + + private void createMultiNodeDedicatedMaster() { + int currWebServerPort = PerformanceAnalyzerWebServer.WEBSERVICE_DEFAULT_PORT; + int currGrpcServerPort = Util.RPC_PORT; + int hostIdx = 0; + + createHost(hostIdx, AllMetrics.NodeRole.ELECTED_MASTER, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + + for (int i = 0; i < Consts.numStandbyMasterNodes; i++) { + createHost(hostIdx, AllMetrics.NodeRole.MASTER, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + } + + for (int i = 0; i < Consts.numDataNodes; i++) { + createHost(hostIdx, AllMetrics.NodeRole.DATA, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + } + } + + private Host createHost(int hostIdx, + AllMetrics.NodeRole role, + int webServerPort, + int grpcServerPort) { + HostTag hostTag = getTagForHostIdForHostTagAssignment(hostIdx); + Host host = new Host(hostIdx, + useHttps, + role, + webServerPort, + grpcServerPort, + this.clusterDir, + rcaEnabled, + hostTag); + tagToHostMapping.put(hostTag, host); + hostList.add(host); + + List hostByRole = roleToHostMap.get(role); + + if (hostByRole == null) { + hostByRole = new ArrayList<>(); + hostByRole.add(host); + roleToHostMap.put(role, hostByRole); + } else { + hostByRole.add(host); + } + return host; + } + + private HostTag getTagForHostIdForHostTagAssignment(int hostId) { + switch (clusterType) { + case MULTI_NODE_DEDICATED_MASTER: + return hostIdToHostTagMapForDedicatedMaster.get(hostId); + case MULTI_NODE_CO_LOCATED_MASTER: + return hostIdToHostTagMapCoLocatedMaster.get(hostId); + case SINGLE_NODE: + return hostIdToHostTagMapSingleNode; + } + throw new IllegalStateException("No cluster type matches"); + } + + public void createServersAndThreads() { + this.errorHandlingThread = PerformanceAnalyzerApp.startErrorHandlingThread(threadProvider, exceptionQueue); + for (Host host : hostList) { + host.createServersAndThreads(threadProvider); + } + } + + public void startRcaControllerThread() { + for (Host host : hostList) { + host.startRcaControllerThread(); + } + } + + private void createSingleNodeCluster() { + int currWebServerPort = PerformanceAnalyzerWebServer.WEBSERVICE_DEFAULT_PORT; + int currGrpcServerPort = Util.RPC_PORT; + int hostIdx = 0; + + createHost(hostIdx, AllMetrics.NodeRole.ELECTED_MASTER, currWebServerPort, currGrpcServerPort); + } + + private void createMultiNodeCoLocatedMaster() { + int currWebServerPort = PerformanceAnalyzerWebServer.WEBSERVICE_DEFAULT_PORT; + int currGrpcServerPort = Util.RPC_PORT; + int hostIdx = 0; + + createHost(hostIdx, AllMetrics.NodeRole.ELECTED_MASTER, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + + for (int i = 0; i < Consts.numDataNodes - 1; i++) { + createHost(hostIdx, AllMetrics.NodeRole.DATA, currWebServerPort, currGrpcServerPort); + + currWebServerPort += 1; + currGrpcServerPort += 1; + hostIdx += 1; + } + } + + public void deleteCluster() throws IOException { + for (List hosts : roleToHostMap.values()) { + for (Host host : hosts) { + host.deleteHost(); + } + } + errorHandlingThread.interrupt(); + deleteClusterDir(); + } + + public void deleteClusterDir() throws IOException { + for (Host host : hostList) { + host.deleteHostDir(); + } + FileUtils.deleteDirectory(clusterDir); + } + + public void stopRcaScheduler() throws Exception { + for (Host host : hostList) { + host.stopRcaScheduler(); + } + } + + public void startRcaScheduler() throws Exception { + for (Host host : hostList) { + host.startRcaScheduler(); + } + } + + public void updateGraph(final Class rcaGraphClass) + throws NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { + for (Host host : hostList) { + host.updateRcaGraph(rcaGraphClass); + } + } + + public void updateMetricsDB(AMetric[] metricAnnotations) throws Exception { + for (Host host : hostList) { + host.updateMetricsDB(metricAnnotations); + } + } + + public JsonElement getAllRcaDataOnHost(HostTag hostTag, String rcaName) { + return tagToHostMapping.get(hostTag).getDataForRca(rcaName); + } + + public String getRcaRestResponse(final Map params, HostTag hostByTag) { + return verifyTag(hostByTag).makeRestRequest(params); + } + + private Host verifyTag(HostTag hostTag) { + Host host = tagToHostMapping.get(hostTag); + if (host == null) { + throw new IllegalArgumentException("No host with tag '" + hostTag + "' exists. " + + "Available tags are: " + tagToHostMapping.keySet()); + } + return host; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java new file mode 100644 index 000000000..7c8da0b26 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java @@ -0,0 +1,436 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.core.Util; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.GRPCConnectionManager; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATable; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATuple; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.overrides.RcaControllerIt; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.overrides.RcaItMetricsDBProvider; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RCAScheduler; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RcaSchedulerState; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Assert; + +/** + * This class simulates a cluster node that executes an RCA graph. Each node has its own + * - GRPC server, + * - web server + * - RCAController and everything that it starts. + */ +public class Host { + private static final Logger LOG = LogManager.getLogger(Host.class); + private final boolean useHttps; + /** + * Each host has its own AppContext instance. + */ + private final AppContext appContext; + private final HostTag myTag; + /** + * This uniquely identifies a host. + */ + private final int hostId; + /** + * For Integration tests, where all the virtual nodes are part of the same JVM, Ip string does not matter. But for + * the sake of having this value filled, the string is 127.0.0.(hostId). + */ + private final String hostIp; + private final AllMetrics.NodeRole role; + private final int webServerPort; + private final int grpcServerPort; + private final File hostDir; + private final boolean rcaEnabled; + private GRPCConnectionManager connectionManager; + private ClientServers clientServers; + private ScheduledExecutorService netOperationsExecutor; + private RcaControllerIt rcaController; + private Thread grpcThread; + private Thread webServerThread; + private Thread rcaControllerThread; + private ThreadProvider threadProvider; + private Path rcaEnabledFile; + + public Host(int hostId, + boolean useHttps, + AllMetrics.NodeRole role, + int httpServerPort, + int grpcServerPort, + File clusterDir, + boolean rcaEnabled, + HostTag myTag) { + this.rcaEnabled = rcaEnabled; + this.useHttps = useHttps; + this.appContext = new AppContext(); + + this.hostId = hostId; + this.myTag = myTag; + + //TODO: make sure this works with the grpc and the webserver. + this.hostIp = createHostIp(); + this.role = role; + + this.webServerPort = httpServerPort; + this.grpcServerPort = grpcServerPort; + + this.hostDir = createHostDir(clusterDir, myTag); + } + + public static String createHostIp() { + return "127.0.0.1"; + } + + private static File createHostDir(File clusterDir, HostTag hostTag) { + File hostFile = Paths.get(clusterDir.getAbsolutePath(), hostTag.toString()).toFile(); + if (!hostFile.exists() && !hostFile.mkdirs()) { + throw new IllegalStateException("Couldn't create dir: " + hostFile); + } + return hostFile; + } + + public void createServersAndThreads(final ThreadProvider threadProvider) { + this.threadProvider = threadProvider; + Objects.requireNonNull(appContext.getClusterDetailsEventProcessor(), + "ClusterDetailsEventProcessor cannot be null in the AppContext"); + + rcaEnabledFile = Paths.get(hostDir.getAbsolutePath(), RcaController.RCA_ENABLED_CONF_FILE); + RcaSchedulerState state = rcaEnabled ? RcaSchedulerState.STATE_STARTED : RcaSchedulerState.STATE_STOPPED; + setExpectedRcaState(state); + + this.connectionManager = new GRPCConnectionManager(useHttps); + this.clientServers = PerformanceAnalyzerApp.createClientServers(connectionManager, + grpcServerPort, + null, + null, + useHttps, + String.valueOf(webServerPort), + null, // A null host is fine as this will use the loopback address + this.appContext); + + this.grpcThread = PerformanceAnalyzerApp.startGrpcServerThread(clientServers.getNetServer(), threadProvider); + this.webServerThread = PerformanceAnalyzerApp.startWebServerThread(clientServers.getHttpServer(), threadProvider); + + netOperationsExecutor = + Executors.newScheduledThreadPool( + 3, new ThreadFactoryBuilder().setNameFormat("test-network-thread-%d").build()); + + this.rcaController = new RcaControllerIt( + threadProvider, + netOperationsExecutor, + connectionManager, + clientServers, + hostDir.getAbsolutePath(), + 10, + 10, + role, + appContext, + null); + } + + // We create a temporary file and then swap it for the rca.enabled file. + public void setExpectedRcaState(RcaSchedulerState rcaState) { + Path rcaEnabledTmp = Paths.get(rcaEnabledFile + ".tmp"); + try (FileWriter f2 = new FileWriter(rcaEnabledTmp.toFile(), false /*To create a new file*/)) { + boolean value = true; + switch (rcaState) { + case STATE_NOT_STARTED: + break; + case STATE_STOPPED_DUE_TO_EXCEPTION: + break; + case STATE_STARTED: + value = true; + break; + case STATE_STOPPED: + value = false; + break; + } + f2.write(String.valueOf(value)); + } catch (IOException e) { + e.printStackTrace(); + return; + } + rcaEnabledTmp.toFile().renameTo(rcaEnabledFile.toFile()); + } + + public void setClusterDetails(final List allHosts) { + List nodeDetails = new ArrayList<>(); + + // The first node in the list is always the node-itself. + nodeDetails.add(hostToNodeDetails(this)); + + for (Host host : allHosts) { + if (host.hostId != this.hostId) { + nodeDetails.add(hostToNodeDetails(host)); + } + } + ClusterDetailsEventProcessor clusterDetailsEventProcessor = new ClusterDetailsEventProcessor(); + clusterDetailsEventProcessor.setNodesDetails(nodeDetails); + appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor); + } + + public static ClusterDetailsEventProcessor.NodeDetails hostToNodeDetails(final Host host) { + return new ClusterDetailsEventProcessor.NodeDetails( + host.role, + host.getMyTag().toString(), + host.hostIp, + host.isElectedMaster(), + host.grpcServerPort); + } + + public HostTag getMyTag() { + return myTag; + } + + public boolean isElectedMaster() { + return AllMetrics.NodeRole.ELECTED_MASTER == this.role; + } + + public void deleteHost() throws IOException { + try { + stopRcaScheduler(); + } catch (Exception e) { + LOG.error("** Error shutting down the scheduler while deleting host.", e); + } + RCAScheduler rcaScheduler = rcaController.getRcaScheduler(); + if (rcaScheduler != null && rcaScheduler.getState() == RcaSchedulerState.STATE_STARTED) { + rcaScheduler.shutdown(); + } + netOperationsExecutor.shutdown(); + try { + netOperationsExecutor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + e.printStackTrace(); + } + clientServers.getHttpServer().stop(10); + clientServers.getNetClient().stop(); + clientServers.getNetServer().shutdown(); + + connectionManager.shutdown(); + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + + webServerThread.interrupt(); + + clientServers.getNetServer().setAttemptedShutdown(); + grpcThread.interrupt(); + + rcaController.setDeliberateInterrupt(); + + LOG.info("RCA Controller thread for host {} is being interrupted." + hostId); + rcaControllerThread.interrupt(); + deleteHostDir(); + LOG.info("Host '{} with role '{}' cleaned up.", hostId, rcaController.getCurrentRole()); + } + + public void deleteHostDir() throws IOException { + FileUtils.deleteDirectory(hostDir); + } + + public void stopRcaScheduler() throws Exception { + RCAScheduler sched = rcaController.getRcaScheduler(); + CountDownLatch shutdownLatch = null; + if (sched != null) { + shutdownLatch = new CountDownLatch(1); + sched.setSchedulerTrackingLatch(shutdownLatch); + } + setExpectedRcaState(RcaSchedulerState.STATE_STOPPED); + if (shutdownLatch != null) { + shutdownLatch.await(10, TimeUnit.SECONDS); + } + LOG.info("RCA Scheduler is STOPPED by TestRunner on node: {}", myTag); + } + + public void startRcaControllerThread() { + this.rcaControllerThread = PerformanceAnalyzerApp.startRcaTopLevelThread( + rcaController, + threadProvider, + appContext.getMyInstanceDetails().getInstanceId().toString()); + } + + public void startRcaScheduler() throws Exception { + setExpectedRcaState(RcaSchedulerState.STATE_STARTED); + rcaController.waitForRcaState(RcaSchedulerState.STATE_STARTED); + } + + public void updateRcaGraph(final Class rcaGraphClass) + throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + rcaController.setRcaGraphComponents(rcaGraphClass); + } + + public void updateMetricsDB(AMetric[] metricAnnotations) throws Exception { + RcaItMetricsDBProvider dbProvider = + new RcaItMetricsDBProvider(Paths.get(hostDir.getPath(), "metricsdb").toString()); + for (AMetric metric : metricAnnotations) { + boolean foundDataForHost = false; + // Each metric can have only one data table that can be associated to a host. + // Which one is determined by the hostTag. The first matching is added to the host + // for the current metric. + dataLoop: + for (ATable table : metric.tables()) { + for (HostTag dataTag : table.hostTag()) { + if (myTag == dataTag) { + // First data-tag to match the hostTags is considered to be a match + for (ATuple tuple : table.tuple()) { + String metricName; + try { + metricName = (String) metric.name().getField("NAME").get(null); + } catch (Exception ex) { + LOG.error("Error getting metric name.", ex); + throw ex; + } + dbProvider.insertRow( + metricName, + metric.dimensionNames(), + tuple.dimensionValues(), + tuple.min(), + tuple.max(), + tuple.avg(), + tuple.sum()); + } + foundDataForHost = true; + // We found a data table matching the tags of the host. Let's move to the + // next metric. + break dataLoop; + } + } + } + if (!foundDataForHost) { + // This is not an error though. For example, a dedicated master node cannot emit + // a shard related metric. + System.out.println("No data found for host " + hostId + " for metric " + metric.name()); + } + } + rcaController.setDbProvider(dbProvider); + } + + public JsonElement getDataForRca(String rcaName) { + JsonElement data = this.rcaController.getPersistenceProvider().read(rcaName); + JsonObject obj = new JsonObject(); + obj.addProperty(Consts.HOST_ID_KEY, hostId); + obj.addProperty(Consts.HOST_ROLE_KEY, role.toString()); + obj.add(Consts.DATA_KEY, data); + return obj; + } + + public String makeRestRequest(final Map kvRequestParams) { + StringBuilder queryString = new StringBuilder(); + + String appender = ""; + for (Map.Entry entry : kvRequestParams.entrySet()) { + queryString.append(appender).append(entry.getKey()).append("=").append(entry.getValue()); + appender = "&"; + } + StringBuilder uri = + new StringBuilder("http://localhost:" + webServerPort + Util.RCA_QUERY_URL); + uri.append("?").append(queryString); + + URL url = null; + try { + url = new URL(uri.toString()); + } catch (MalformedURLException e) { + e.printStackTrace(); + Assert.fail(); + } + + String response = ""; + HttpURLConnection connection = null; + + try { + connection = (HttpURLConnection) url.openConnection(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + + try { + connection.setRequestMethod("GET"); + } catch (ProtocolException e) { + e.printStackTrace(); + connection.disconnect(); + Assert.fail(); + } + + try { + int status = connection.getResponseCode(); + if (status != 200) { + List ret = + new BufferedReader(new InputStreamReader(connection.getErrorStream())).lines().collect(Collectors.toList()); + throw new IllegalStateException(ret.toString()); + } + } catch (IOException e) { + e.printStackTrace(); + connection.disconnect(); + Assert.fail(); + } + + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String inputLine; + StringBuffer content = new StringBuffer(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + response = content.toString(); + } catch (IOException e) { + e.printStackTrace(); + connection.disconnect(); + Assert.fail(); + } + return response; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java new file mode 100644 index 000000000..d24a6d742 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/TestEnvironment.java @@ -0,0 +1,151 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaControllerHelper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaGraph; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestEnvironment { + private final Cluster cluster; + private final Env classLevelEnv; + + private Env currentEnv; + + public TestEnvironment(final Cluster cluster, final Class testClass) throws Exception { + this.cluster = cluster; + this.classLevelEnv = updateEnvironment(testClass); + } + + private Env updateEnvironment(final Class testClass) throws Exception { + boolean annotationsPresent = testClass.isAnnotationPresent(ARcaConf.class) + | testClass.isAnnotationPresent(ARcaGraph.class) + | testClass.isAnnotationPresent(ARcaConf.class); + + Env env = new Env(); + + if (annotationsPresent) { + if (testClass.isAnnotationPresent(ARcaConf.class)) { + updateRcaConf((ARcaConf) testClass.getAnnotation(ARcaConf.class), env); + } + if (testClass.isAnnotationPresent(ARcaGraph.class)) { + updateRcaGraph((ARcaGraph) testClass.getAnnotation(ARcaGraph.class), env); + } + + if (testClass.isAnnotationPresent(AMetric.Metrics.class) + || testClass.isAnnotationPresent(AMetric.class)) { + updateMetricsDB((AMetric[]) testClass.getAnnotationsByType(AMetric.class), env); + } + } + if (env.rcaConfMap.isEmpty()) { + updateWithDefaultRcaConfAnnotation(env); + } + return env; + } + + public void updateEnvironment(final Method method) throws Exception { + boolean annotationsPresent = method.isAnnotationPresent(ARcaConf.class) + | method.isAnnotationPresent(ARcaGraph.class) + | method.isAnnotationPresent(ARcaConf.class); + + this.currentEnv = new Env(this.classLevelEnv); + + if (annotationsPresent) { + if (method.isAnnotationPresent(ARcaConf.class)) { + updateRcaConf(method.getAnnotation(ARcaConf.class), currentEnv); + } + if (method.isAnnotationPresent(ARcaGraph.class)) { + updateRcaGraph(method.getAnnotation(ARcaGraph.class), currentEnv); + } + + if (method.isAnnotationPresent(AMetric.Metrics.class)) { + updateMetricsDB(method.getAnnotationsByType(AMetric.class), currentEnv); + } + } + } + + private void updateRcaConf(ARcaConf aRcaConf, Env env) { + String masterRcaConf = aRcaConf.electedMaster(); + env.rcaConfMap.put(ARcaConf.Type.ELECTED_MASTER, masterRcaConf); + + String standByMasterRcaConf = aRcaConf.standBy(); + env.rcaConfMap.put(ARcaConf.Type.STANDBY_MASTER, standByMasterRcaConf); + + String dataNodesRcaConf = aRcaConf.dataNode(); + env.rcaConfMap.put(ARcaConf.Type.DATA_NODES, dataNodesRcaConf); + + RcaControllerHelper.set(dataNodesRcaConf, standByMasterRcaConf, masterRcaConf); + } + + private void updateRcaGraph(ARcaGraph aRcaGraph, Env env) throws NoSuchMethodException, + IllegalAccessException, InvocationTargetException, InstantiationException { + Class graphClass = aRcaGraph.value(); + env.rcaGraphClass = graphClass; + cluster.updateGraph(graphClass); + } + + private void updateMetricsDB(AMetric[] metrics, Env env) throws Exception { + cluster.updateMetricsDB(metrics); + env.isMetricsDBProviderSet = true; + } + + private void updateWithDefaultRcaConfAnnotation(Env env) throws NoSuchMethodException { + String masterRcaConf = (String) ARcaConf.class.getMethod("electedMaster").getDefaultValue(); + env.rcaConfMap.put(ARcaConf.Type.ELECTED_MASTER, masterRcaConf); + + String standByMasterRcaConf = + (String) ARcaConf.class.getMethod("standBy").getDefaultValue(); + env.rcaConfMap.put(ARcaConf.Type.STANDBY_MASTER, standByMasterRcaConf); + + String dataNodesRcaConf = + (String) ARcaConf.class.getMethod("dataNode").getDefaultValue(); + env.rcaConfMap.put(ARcaConf.Type.DATA_NODES, dataNodesRcaConf); + + RcaControllerHelper.set(dataNodesRcaConf, standByMasterRcaConf, masterRcaConf); + } + + public void clearUpMethodLevelEnvOverride() { + this.currentEnv = this.classLevelEnv; + } + + public void verifyEnvironmentSetup() throws IllegalStateException { + List annotationsNotSet = new ArrayList<>(); + + if (currentEnv.rcaConfMap.isEmpty()) { + annotationsNotSet.add(ARcaGraph.class.getSimpleName()); + } + if (currentEnv.rcaGraphClass == null) { + annotationsNotSet.add(ARcaGraph.class.getSimpleName()); + } + if (!currentEnv.isMetricsDBProviderSet) { + annotationsNotSet.add(AMetric.class.getSimpleName()); + } + + if (!annotationsNotSet.isEmpty()) { + throw new IllegalStateException("** Some annotations are not set" + annotationsNotSet); + } + } + + class Env { + private final Map rcaConfMap; + private Class rcaGraphClass; + private boolean isMetricsDBProviderSet; + + Env() { + this.rcaConfMap = new HashMap<>(); + this.rcaGraphClass = null; + this.isMetricsDBProviderSet = false; + } + + Env(Env other) { + this.rcaConfMap = other.rcaConfMap; + this.rcaGraphClass = other.rcaGraphClass; + this.isMetricsDBProviderSet = other.isMetricsDBProviderSet; + } + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AClusterType.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AClusterType.java new file mode 100644 index 000000000..62457223a --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AClusterType.java @@ -0,0 +1,18 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This is a class level annotation that must be present for each of the RCAIt + * test classes. This specifies the cluster type - single node vs multi-node + * with dedicated master vs multi-node with co-located master. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface AClusterType { + ClusterType value(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AExpect.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AExpect.java new file mode 100644 index 000000000..c089a9a65 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AExpect.java @@ -0,0 +1,64 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import java.lang.annotation.ElementType; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This annotation is used for validating the result. After invoking the test, + * the RcaItRunner periodically checks the output and passes the test if it + * passes the validator checks. + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Repeatable(value = AExpect.Expectations.class) +public @interface AExpect { + /** + * @return What will be queried and matched against the expectations. + */ + Type what(); + + /** + * Which node will be queried for this data ? + * + * @return The node to be queried for this + */ + HostTag on(); + + /** + * How long shall we wait for the expected result to show up. + * + * @return timeout in seconds + */ + long timeoutSeconds() default 60; + + /** + * Which custom validator should be called with the current query-results. + * + * @return The class that will be used for validation. + */ + Class validator(); + + /** + * Which RCA should we ask for from the SQLite or the rest API. + * + * @return The Rca class to look for. + */ + Class forRca(); + + /** + * Currently supported places to look for RCA outputs are the SQLite file or by hitting the rest endpoint. + */ + enum Type { + REST_API + } + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface Expectations { + AExpect[] value(); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java new file mode 100644 index 000000000..a05b81df7 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/AMetric.java @@ -0,0 +1,34 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This annotation can be used to specify the metrics that will be poured onto the RCA Graph. Under the covers, + * the framework will try to simulate a DBProvider that will respond with these metrics when the RCA Metrics + * nodes query for one. This annotation lets you specify one or more metric tables, similar to the 5 second + * metric snapshots of the metricsdb files, and also specify a table for one or a group of cluster hosts. + */ +@Retention(RetentionPolicy.RUNTIME) +@Repeatable(value = AMetric.Metrics.class) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface AMetric { + // The metric this data is emulating. + Class name(); + + // The names of the dimension columns that the metrics has. The dimensions can be found here: + // https://opendistro.github.io/for-elasticsearch-docs/docs/pa/reference/ + String[] dimensionNames(); + + // Specify one or more tables for the metric where each table belongs to a host or a group of hosts. + ATable[] tables(); + + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.TYPE, ElementType.METHOD}) + @interface Metrics { + AMetric[] value(); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java new file mode 100644 index 000000000..48642a82a --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaConf.java @@ -0,0 +1,30 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This annotation can be used to specify an rca.conf file. Usually tests don't need to provide the rca.conf + * therefore, it uses the rca.conf* files in the test/resources as defaults. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ARcaConf { + // full path to the rca.conf file to be used by elected master node. + String electedMaster() default Consts.RCAIT_DEFAULT_RCA_CONF_ELECTED_MASTER_NODE; + + // full path to the rca.conf file to be used by the standby master. + String standBy() default Consts.RCAIT_DEFAULT_RCA_CONF_STANDBY_MASTER_NODE; + + // full path to the rca.conf file to be used by the data node. + String dataNode() default Consts.RCAIT_DEFAULT_RCA_CONF_DATA_NODE; + + enum Type { + ELECTED_MASTER, + STANDBY_MASTER, + DATA_NODES + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java new file mode 100644 index 000000000..ced737011 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ARcaGraph.java @@ -0,0 +1,15 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This can be used to specify the RCA graph to be used by all nodes of the cluster. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface ARcaGraph { + Class value(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ATable.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ATable.java new file mode 100644 index 000000000..e793c377a --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ATable.java @@ -0,0 +1,17 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * This specifies a table for a given metric. This annotation is a sub-field of the AMetric annotation. + */ +@Retention(RetentionPolicy.RUNTIME) +public @interface ATable { + // Which host should emit this metric + HostTag[] hostTag(); + + // The data in tabular form. + ATuple[] tuple(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ATuple.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ATuple.java new file mode 100644 index 000000000..777144da5 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/annotations/ATuple.java @@ -0,0 +1,21 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * Should not be used on it own. This is a sub-field of the AMetric annotation. This represents one row of data + * for a Metric. All the dimension values are specified as Strings. + */ +@Retention(RetentionPolicy.RUNTIME) +public @interface ATuple { + String[] dimensionValues(); + + double min(); + + double max(); + + double sum(); + + double avg(); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/api/IValidator.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/api/IValidator.java new file mode 100644 index 000000000..60034de7f --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/api/IValidator.java @@ -0,0 +1,20 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api; + +import com.google.gson.JsonElement; +import org.jooq.Record; +import org.jooq.Result; + +/** + * This interface specifies a Validator that can used for validation of results. Based on what is required to be validated + * the RCA-IT framework will call one of the check methods with the latest data from the current iteration. + */ +public interface IValidator { + + /** + * Based on what is required to be validated, + * + * @param response This REST response as a JSONElement. + * @return true, if this matches expectation. + */ + boolean check(JsonElement response); +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/api/TestApi.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/api/TestApi.java new file mode 100644 index 000000000..19247440d --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/api/TestApi.java @@ -0,0 +1,47 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.Cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nonnull; +import org.jooq.Record; +import org.jooq.Result; + +/** + * This is API class whose object is injected into each of the test methods in case test class + * declares a @{code setTestApi(final TestApi api)}. + */ +public class TestApi { + /** + * An instance of the cluster object to get access to the nodes to query various data to see + * that the tests have the desired results. + */ + private final Cluster cluster; + + public TestApi(Cluster cluster) { + this.cluster = cluster; + } + + public JsonElement getRcaDataOnHost(HostTag hostTag, String rcaName) { + return cluster.getAllRcaDataOnHost(hostTag, rcaName); + } + + /** + * This let's you make a REST request to the REST endpoint of a particular host identified by + * the host tag. + * + * @param params the key value map that is passes as the request parameter. + * @param hostByTag The host whose rest endpoint we will hit. + * @return The response serialized as a String. + */ + public String getRcaRestResponse(@Nonnull final Map params, + HostTag hostByTag) { + Objects.requireNonNull(params); + return cluster.getRcaRestResponse(params, hostByTag); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java new file mode 100644 index 000000000..8f23546d6 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/ClusterType.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +/** + * Types of clusters supported by the integration test framework. + */ +public enum ClusterType { + /** + * A single node cluster. The only node in the cluster will be tagged as DATA_0 + */ + SINGLE_NODE, + /** + * A two data-nodes clusters where data-node0 acts as the elected master. + * node0 will be tagged as ELECTED_MASTER and node1 will be tagged as DATA_0. + */ + MULTI_NODE_CO_LOCATED_MASTER, + + /** + * a three dedicated master nodes and two data node cluster. + * node0 will be tagged as ELECTED_MASTER. + * node1 will be tagged as STANDBY_MASTER_0. + * node2 will be tagged as STANDBY_MASTER_1. + * node3 will be tagged as DATA_0. + * node4 will be tagged as DATA_1. + */ + MULTI_NODE_DEDICATED_MASTER +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java new file mode 100644 index 000000000..5f60ae4ad --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/Consts.java @@ -0,0 +1,20 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +public class Consts { + public static final String RCA_IT_BASE_DIR = "/tmp/rcaIt"; + + // This format is used to create a directory that will be used by the simulated host for the current test run. + public static final String RCA_IT_CLUSTER_DIR_FORMAT = "yyyy.MM.dd.HH.mm.ss"; + public static final String TEST_RESOURCES_DIR = "./src/test/resources/rca/"; + public static final String RCAIT_DEFAULT_RCA_CONF_ELECTED_MASTER_NODE = TEST_RESOURCES_DIR + "rca_elected_master.conf"; + public static final String RCAIT_DEFAULT_RCA_CONF_STANDBY_MASTER_NODE = TEST_RESOURCES_DIR + "rca_master.conf"; + public static final String RCAIT_DEFAULT_RCA_CONF_DATA_NODE = TEST_RESOURCES_DIR + "rca.conf"; + + public static final String HOST_ID_KEY = "hostId"; + public static final String HOST_ROLE_KEY = "hostRole"; + public static final String DATA_KEY = "data"; + + // Node count constants. + public static final int numDataNodes = 2; + public static final int numStandbyMasterNodes = 2; +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java new file mode 100644 index 000000000..0a80d3bc3 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/configs/HostTag.java @@ -0,0 +1,11 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs; + +public enum HostTag { + ELECTED_MASTER, + + // The STANDBY_MASTER_X tags are only used in dedicated master node clusters. + STANDBY_MASTER_0, + STANDBY_MASTER_1, + DATA_0, + DATA_1, +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaConfIt.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaConfIt.java new file mode 100644 index 000000000..f0a993aaf --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaConfIt.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.overrides; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts; +import java.util.HashMap; +import java.util.Map; + +public class RcaConfIt extends RcaConf { + private String rcaDataStorePath; + private String rcaAnalysisGraph; + + public RcaConfIt(RcaConf rcaConf) { + super(rcaConf.getConfigFileLoc()); + } + + public void setRcaDataStorePath(String dataStorePath) { + this.rcaDataStorePath = dataStorePath; + } + + @Override + public Map getDatastore() { + Map map = new HashMap<>(super.getDatastore()); + map.put(RcaConsts.DATASTORE_LOC_KEY, rcaDataStorePath); + return map; + } + + @Override + public String getAnalysisGraphEntryPoint() { + return rcaAnalysisGraph; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaControllerIt.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaControllerIt.java new file mode 100644 index 000000000..97527180a --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaControllerIt.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.overrides; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ClientServers; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.GRPCConnectionManager; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ConnectedComponent; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RcaSchedulerState; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.threads.ThreadProvider; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.WaitFor; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class RcaControllerIt extends RcaController { + private final String rcaPath; + private List rcaGraphComponents; + + public RcaControllerIt(ThreadProvider threadProvider, + ScheduledExecutorService netOpsExecutorService, + GRPCConnectionManager grpcConnectionManager, + ClientServers clientServers, + String rca_enabled_conf_location, + long rcaStateCheckIntervalMillis, + long nodeRoleCheckPeriodicityMillis, + AllMetrics.NodeRole nodeRole, + final AppContext appContext, + final Queryable dbProvider) { + super(threadProvider, + netOpsExecutorService, + grpcConnectionManager, + clientServers, + rca_enabled_conf_location, + rcaStateCheckIntervalMillis, + nodeRoleCheckPeriodicityMillis, + appContext, + dbProvider); + this.currentRole = nodeRole; + this.rcaPath = rca_enabled_conf_location; + } + + @Override + protected List getRcaGraphComponents(RcaConf rcaConf) throws + ClassNotFoundException, + NoSuchMethodException, + InvocationTargetException, + InstantiationException, + IllegalAccessException { + if (rcaGraphComponents != null) { + return rcaGraphComponents; + } else { + return super.getRcaGraphComponents(rcaConf); + } + } + + @Override + protected RcaConf getRcaConfForMyRole(AllMetrics.NodeRole nodeRole) { + RcaConfIt rcaConfIt = new RcaConfIt(super.getRcaConfForMyRole(nodeRole)); + rcaConfIt.setRcaDataStorePath(rcaPath); + return rcaConfIt; + } + + public void setDbProvider(final Queryable db) { + dbProvider = db; + } + + public void setRcaGraphComponents(Class rcaGraphClass) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + AnalysisGraph graphObject = + (AnalysisGraph) rcaGraphClass.getDeclaredConstructor().newInstance(); + this.rcaGraphComponents = RcaUtil.getAnalysisGraphComponents(graphObject); + } + + public void waitForRcaState(RcaSchedulerState state) throws Exception { + WaitFor.waitFor(() -> getRcaScheduler() != null && getRcaScheduler().getState() == state, 20, TimeUnit.SECONDS); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaItMetricsDBProvider.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaItMetricsDBProvider.java new file mode 100644 index 000000000..db70b9966 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/overrides/RcaItMetricsDBProvider.java @@ -0,0 +1,68 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.overrides; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.Dimensions; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.Metric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; + +public class RcaItMetricsDBProvider extends MetricsDBProvider { + private final String DB_FILE_PATH; + private final MetricsDB db; + + public RcaItMetricsDBProvider(String metricsDbFilePath) throws Exception { + DB_FILE_PATH = metricsDbFilePath; + // Cleanup the file if exists. + try { + Files.delete(Paths.get(DB_FILE_PATH)); + } catch (NoSuchFileException ignored) { + } + + try { + Files.delete(Paths.get(DB_FILE_PATH + "-journal")); + } catch (NoSuchFileException ignored) { + } + + // TODO: clean up the DB after the tests are done. + db = new MetricsDB(System.currentTimeMillis()) { + @Override + public String getDBFilePath() { + Path configPath = Paths.get(DB_FILE_PATH); + return configPath.toString(); + } + + @Override + public void deleteOnDiskFile() { + try { + Files.delete(Paths.get(getDBFilePath())); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + } + + @Override + public MetricsDB getMetricsDB() { + return db; + } + + public void insertRow(String metricName, + String[] dimensionNames, + String[] dimensionValues, + double min, double max, double avg, double sum) { + Dimensions dimensions = new Dimensions(); + for (int i = 0; i < dimensionNames.length; i++) { + dimensions.put(dimensionNames[i], dimensionValues[i]); + } + Metric metric = new Metric(metricName, sum, avg, min, max); + + db.createMetric(metric, Arrays.asList(dimensionNames)); + db.putMetric(metric, dimensions, 0); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java new file mode 100644 index 000000000..4ede82e49 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/IRcaItRunner.java @@ -0,0 +1,40 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.Cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.Consts; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import org.apache.commons.io.FileUtils; + +public interface IRcaItRunner { + String SET_CLUSTER_METHOD = "setTestApi"; + + default Cluster createCluster(ClusterType clusterType, boolean useHttps) throws IOException { + SimpleDateFormat sdf = new SimpleDateFormat(Consts.RCA_IT_CLUSTER_DIR_FORMAT); + Timestamp timestamp = new Timestamp(System.currentTimeMillis()); + String formattedTime = sdf.format(timestamp); + Path rcaItDir = Paths.get(Consts.RCA_IT_BASE_DIR, formattedTime); + + cleanUpDirsFromLastRuns(); + createITDirForThisRun(rcaItDir); + + return new Cluster(clusterType, rcaItDir.toFile(), useHttps); + } + + default void cleanUpDirsFromLastRuns() throws IOException { + FileUtils.deleteDirectory(Paths.get(Consts.RCA_IT_BASE_DIR).toFile()); + } + + default void createITDirForThisRun(Path rcaItDir) { + File clusterDir = rcaItDir.toFile(); + File parent = clusterDir.getParentFile(); + if (!clusterDir.exists() && !clusterDir.mkdirs()) { + throw new IllegalStateException("Couldn't create dir: " + parent); + } + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/RcaItNotEncryptedRunner.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/RcaItNotEncryptedRunner.java new file mode 100644 index 000000000..6850240c8 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/RcaItNotEncryptedRunner.java @@ -0,0 +1,9 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners; + +public class RcaItNotEncryptedRunner extends RcaItRunnerBase { + private static final boolean USE_HTTPS = false; + + public RcaItNotEncryptedRunner(Class testClass) throws Exception { + super(testClass, USE_HTTPS); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/RcaItRunnerBase.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/RcaItRunnerBase.java new file mode 100644 index 000000000..92ed3bb1c --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/runners/RcaItRunnerBase.java @@ -0,0 +1,203 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.Cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.TestEnvironment; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AExpect; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api.IValidator; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api.TestApi; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.junit.runner.Description; +import org.junit.runner.Runner; +import org.junit.runner.notification.Failure; +import org.junit.runner.notification.RunNotifier; + +/** + * This is the main runner class that is used by the RCA-IT. + */ +public abstract class RcaItRunnerBase extends Runner implements IRcaItRunner { + private static final Logger LOG = LogManager.getLogger(RcaItRunnerBase.class); + + // The class whose tests the runner is currently executing. + private final Class testClass; + + // An instance of the test class the runner is executing. + private final Object testObject; + + // This is used to set up the environment. An environment for running RCA graph would be to push the RCA graph itself, + // the metrics, the rca.conf if that needs to be changed. It reads them from the annotations and sets them up for the + // cluster object. + private final TestEnvironment testEnvironment; + + // An instance of the cluster where tests are running. + private final Cluster cluster; + + // This is wrapper on top of the cluster object that is passed on to the testClass to get access to the cluster. + private final TestApi testApi; + + public RcaItRunnerBase(Class testClass, boolean useHttps) throws Exception { + super(); + this.testClass = testClass; + ClusterType clusterType = getClusterTypeFromAnnotation(testClass); + this.cluster = createCluster(clusterType, useHttps); + this.testApi = new TestApi(cluster); + this.testObject = testClass.getDeclaredConstructor().newInstance(); + + try { + Method setClusterMethod = testClass.getMethod(SET_CLUSTER_METHOD, TestApi.class); + setClusterMethod.setAccessible(true); + setClusterMethod.invoke(testObject, testApi); + } catch (NoSuchMethodException ex) { + // This test class hasn't defined a method setCluster(Cluster). SO probably it does not need + // access to the cluster object. Which is fine. We move on to the method execution. + } + + cluster.createServersAndThreads(); + try { + this.testEnvironment = new TestEnvironment(cluster, testClass); + } catch (Exception ex) { + cluster.deleteClusterDir(); + ex.printStackTrace(); + throw ex; + } + cluster.startRcaControllerThread(); + } + + private static ClusterType getClusterTypeFromAnnotation(Class testClass) { + if (!testClass.isAnnotationPresent(AClusterType.class)) { + throw new IllegalArgumentException( + testClass.getSimpleName() + " does not have the mandatory annotation: " + AClusterType.class.getSimpleName()); + } + return ((AClusterType) testClass.getAnnotation(AClusterType.class)).value(); + } + + @Override + public Description getDescription() { + return Description.createTestDescription(testClass, "A custom runner for RcaIt"); + } + + @Override + public void run(RunNotifier notifier) { + try { + for (Method method : testClass.getMethods()) { + if (method.isAnnotationPresent(Test.class)) { + notifier.fireTestStarted(Description + .createTestDescription(testClass, method.getName())); + + try { + this.testEnvironment.updateEnvironment(method); + this.testEnvironment.verifyEnvironmentSetup(); + } catch (Exception ex) { + notifier.fireTestFailure( + new Failure( + Description.createTestDescription(testClass.getClass(), method.getName()), ex)); + } + cluster.startRcaScheduler(); + + try { + method.invoke(testObject); + List failedChecks = validate(method); + + if (!failedChecks.isEmpty()) { + StringBuilder sb = new StringBuilder("Failed validations for:"); + for (Class failed: failedChecks) { + sb.append(System.lineSeparator()).append(failed); + } + + notifier.fireTestFailure( + new Failure( + Description.createTestDescription(testClass.getClass(), method.getName()), + new AssertionError(sb.toString()))); + } + } catch (Exception exception) { + LOG.error("** ERR: While running method: '{}'", method.getName(), exception); + notifier.fireTestFailure( + new Failure( + Description.createTestDescription(testClass.getClass(), method.getName()), exception)); + } + + cluster.stopRcaScheduler(); + this.testEnvironment.clearUpMethodLevelEnvOverride(); + + notifier.fireTestFinished(Description.createTestDescription(testClass, method.getName())); + } + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + try { + cluster.deleteCluster(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private List validate(Method method) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + List failedValidations = new ArrayList<>(); + if (method.isAnnotationPresent(AExpect.Expectations.class) || method.isAnnotationPresent(AExpect.class)) { + AExpect[] expectations = method.getDeclaredAnnotationsByType(AExpect.class); + + IValidator[] validators = new IValidator[expectations.length]; + long maxWaitMillis = 0; + + // Initialization loop for validation and the maximum wait time for the checks to pass. + for (int i = 0; i < expectations.length; i++) { + AExpect expect = expectations[i]; + validators[i] = (IValidator) expect.validator().getDeclaredConstructor().newInstance(); + long timeOutMillis = TimeUnit.MILLISECONDS.convert(expect.timeoutSeconds(), TimeUnit.SECONDS); + if (timeOutMillis > maxWaitMillis) { + maxWaitMillis = timeOutMillis; + } + } + + long startMillis = System.currentTimeMillis(); + long endTimeMillis = startMillis + maxWaitMillis; + + + while (System.currentTimeMillis() <= endTimeMillis) { + failedValidations.clear(); + int passedCount = 0; + // All checks must pass for one run for the validations to succeed. It's not valid if + // different checks pass for different runs. + for (int i = 0; i < expectations.length; i++) { + // This is already initialized. Cannot be null. + IValidator validator = validators[i]; + AExpect expect = expectations[i]; + AExpect.Type what = expect.what(); + boolean successful = false; + + Class rca = expect.forRca(); + + switch (what) { + case REST_API: + successful = validator.check(testApi.getRcaDataOnHost(expect.on(), rca.getSimpleName())); + if (!successful) { + failedValidations.add(validator.getClass()); + } + break; + } + if (successful) { + passedCount += 1; + } + } + + if (passedCount == expectations.length) { + break; + } + } + } + return failedValidations; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocCoLocatedMaster.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocCoLocatedMaster.java new file mode 100644 index 000000000..c72a4cfc5 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocCoLocatedMaster.java @@ -0,0 +1,67 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.INDEX_NAME_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.OPERATION_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARDID_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARD_ROLE_VALUE; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.CPU_Utilization; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AExpect; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATable; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATuple; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api.TestApi; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners.RcaItNotEncryptedRunner; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc.validator.PocValidator; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(RcaItNotEncryptedRunner.class) + +@AClusterType(ClusterType.MULTI_NODE_CO_LOCATED_MASTER) +@ARcaGraph(RcaItPocSingleNode.SimpleAnalysisGraphForCoLocated.class) +@AMetric(name = CPU_Utilization.class, + dimensionNames = {SHARDID_VALUE, INDEX_NAME_VALUE, OPERATION_VALUE, SHARD_ROLE_VALUE}, + tables = { + @ATable(hostTag = HostTag.DATA_0, + tuple = { + @ATuple(dimensionValues = {"0", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 0.0), + @ATuple(dimensionValues = {"1", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 80.0), + @ATuple(dimensionValues = {"2", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 10.0) + } + ), + @ATable(hostTag = {HostTag.ELECTED_MASTER}, + tuple = { + @ATuple(dimensionValues = {"0", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 50.0), + @ATuple(dimensionValues = {"1", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 5.0), + @ATuple(dimensionValues = {"2", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 11.0) + } + ) + } +) +public class RcaItPocCoLocatedMaster { + private TestApi api; + + @Test + @AExpect( + what = AExpect.Type.REST_API, + on = HostTag.ELECTED_MASTER, + validator = PocValidator.class, + forRca = RcaItPocSingleNode.SimpleAnalysisGraphForCoLocated.ClusterRca.class) + public void simple() { + } + + public void setTestApi(final TestApi api) { + this.api = api; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocDedicated.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocDedicated.java new file mode 100644 index 000000000..7d05310a6 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocDedicated.java @@ -0,0 +1,93 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.INDEX_NAME_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.OPERATION_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARDID_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARD_ROLE_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_DATA_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_LOCUS; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.CPU_Utilization; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AExpect; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATable; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATuple; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api.TestApi; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners.RcaItNotEncryptedRunner; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc.validator.PocValidator; +import java.util.Arrays; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(RcaItNotEncryptedRunner.class) + +@AClusterType(ClusterType.MULTI_NODE_DEDICATED_MASTER) +@ARcaGraph(RcaItPocDedicated.SimpleAnalysisGraphForDedicated.class) +@AMetric(name = CPU_Utilization.class, + dimensionNames = {SHARDID_VALUE, INDEX_NAME_VALUE, OPERATION_VALUE, SHARD_ROLE_VALUE}, + tables = { + @ATable(hostTag = HostTag.DATA_0, + tuple = { + @ATuple(dimensionValues = {"0", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 0.0), + @ATuple(dimensionValues = {"1", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 80.0), + @ATuple(dimensionValues = {"2", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 10.0) + } + ), + @ATable(hostTag = {HostTag.DATA_1}, + tuple = { + @ATuple(dimensionValues = {"0", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 50.0), + @ATuple(dimensionValues = {"1", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 5.0), + @ATuple(dimensionValues = {"2", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 11.0) + } + ) + } +) +public class RcaItPocDedicated { + private TestApi api; + + @Test + @AExpect( + what = AExpect.Type.REST_API, + on = HostTag.ELECTED_MASTER, + validator = PocValidator.class, + forRca = SimpleAnalysisGraphForDedicated.ClusterRca.class) + public void simple() { + } + + public void setTestApi(final TestApi api) { + this.api = api; + } + + + public static class SimpleAnalysisGraphForDedicated extends SimpleAnalysisGraph { + + @Override + public void construct() { + CPU_Utilization cpuUtilization = new CPU_Utilization(1); + cpuUtilization.addTag(TAG_LOCUS, LOCUS_DATA_NODE); + addLeaf(cpuUtilization); + + SimpleAnalysisGraph.NodeRca nodeRca = new SimpleAnalysisGraph.NodeRca(cpuUtilization); + nodeRca.addTag(TAG_LOCUS, LOCUS_DATA_NODE); + nodeRca.addAllUpstreams(Arrays.asList(cpuUtilization)); + + SimpleAnalysisGraph.ClusterRca clusterRca = new SimpleAnalysisGraph.ClusterRca(nodeRca); + clusterRca.addTag(TAG_LOCUS, LOCUS_MASTER_NODE); + clusterRca.addAllUpstreams(Collections.singletonList(nodeRca)); + clusterRca.addTag(TAG_AGGREGATE_UPSTREAM, LOCUS_DATA_NODE); + } + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocSingleNode.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocSingleNode.java new file mode 100644 index 000000000..76ccb4644 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocSingleNode.java @@ -0,0 +1,84 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.INDEX_NAME_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.OPERATION_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARDID_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.Constants.SHARD_ROLE_VALUE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_DATA_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_LOCUS; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.CPU_Utilization; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AExpect; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.AMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ARcaGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATable; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.annotations.ATuple; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api.TestApi; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.ClusterType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.configs.HostTag; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.runners.RcaItNotEncryptedRunner; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc.validator.PocValidator; +import java.util.Arrays; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(RcaItNotEncryptedRunner.class) + +@AClusterType(ClusterType.SINGLE_NODE) +@ARcaGraph(RcaItPocSingleNode.SimpleAnalysisGraphForCoLocated.class) +@AMetric(name = CPU_Utilization.class, + dimensionNames = {SHARDID_VALUE, INDEX_NAME_VALUE, OPERATION_VALUE, SHARD_ROLE_VALUE}, + tables = { + @ATable(hostTag = HostTag.DATA_0, + tuple = { + @ATuple(dimensionValues = {"0", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 0.0), + @ATuple(dimensionValues = {"1", "logs", "bulk", "r"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 80.0), + @ATuple(dimensionValues = {"2", "logs", "bulk", "p"}, + sum = 0.0, avg = 0.0, min = 0.0, max = 10.0) + } + ) + } +) +public class RcaItPocSingleNode { + private TestApi api; + + @Test + @AExpect( + what = AExpect.Type.REST_API, + on = HostTag.DATA_0, + validator = PocValidator.class, + forRca = SimpleAnalysisGraphForCoLocated.ClusterRca.class) + public void simple() { + } + + public void setTestApi(final TestApi api) { + this.api = api; + } + + + public static class SimpleAnalysisGraphForCoLocated extends SimpleAnalysisGraph { + + @Override + public void construct() { + CPU_Utilization cpuUtilization = new CPU_Utilization(1); + cpuUtilization.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE); + addLeaf(cpuUtilization); + + SimpleAnalysisGraph.NodeRca nodeRca = new SimpleAnalysisGraph.NodeRca(cpuUtilization); + nodeRca.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE); + nodeRca.addAllUpstreams(Arrays.asList(cpuUtilization)); + + SimpleAnalysisGraph.ClusterRca clusterRca = new SimpleAnalysisGraph.ClusterRca(nodeRca); + clusterRca.addTag(TAG_LOCUS, LOCUS_MASTER_NODE); + clusterRca.addAllUpstreams(Collections.singletonList(nodeRca)); + clusterRca.addTag(TAG_AGGREGATE_UPSTREAM, LOCUS_DATA_NODE); + } + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/SimpleAnalysisGraph.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/SimpleAnalysisGraph.java new file mode 100644 index 000000000..0e0ea1c84 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/SimpleAnalysisGraph.java @@ -0,0 +1,168 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.INDEX_NAME; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension.SHARD_ID; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.CPU_Utilization; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.ElasticSearchAnalysisGraph; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.hotshard.IndexShardKey; +import java.util.ArrayList; +import java.util.List; +import org.jooq.Record; + +public class SimpleAnalysisGraph extends ElasticSearchAnalysisGraph { + public static class NodeRca extends Rca> { + private final CPU_Utilization cpuUtilization; + + public NodeRca(CPU_Utilization cpu_utilization) { + super(1); + this.cpuUtilization = cpu_utilization; + } + + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + final List flowUnitMessages = + args.getWireHopper().readFromWire(args.getNode()); + List> flowUnitList = new ArrayList<>(); + for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { + flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); + } + setFlowUnits(flowUnitList); + } + + @Override + public ResourceFlowUnit operate() { + double maxCpu = 0; + IndexShardKey indexShardKey = null; + for (MetricFlowUnit metricFlowUnit : cpuUtilization.getFlowUnits()) { + if (metricFlowUnit.getData() != null) { + // Go through all the entries and find out the shard with the highest CPU + // utilization. + for (Record record : metricFlowUnit.getData()) { + try { + String indexName = record.getValue(INDEX_NAME.toString(), String.class); + // System.out.println(record); + Integer shardId = record.getValue(SHARD_ID.toString(), Integer.class); + if (indexName != null && shardId != null) { + double usage = record.getValue(MetricsDB.MAX, Double.class); + if (usage > maxCpu) { + maxCpu = usage; + indexShardKey = IndexShardKey.buildIndexShardKey(record); + } + } + } catch (IllegalArgumentException ex) { + + } + } + } + } + InstanceDetails instanceDetails = getInstanceDetails(); + HotNodeSummary nodeSummary = new HotNodeSummary(instanceDetails.getInstanceId(), + instanceDetails.getInstanceIp()); + ResourceFlowUnit rfu; + if (indexShardKey != null) { + //System.out.println("NodeRca running on " + instanceDetails.getInstanceId()); + + HotShardSummary summary = new HotShardSummary( + indexShardKey.getIndexName(), + String.valueOf(indexShardKey.getShardId()), + instanceDetails.getInstanceId().toString(), + 0); + summary.setcpuUtilization(maxCpu); + nodeSummary.appendNestedSummary(summary); + rfu = new ResourceFlowUnit<>( + System.currentTimeMillis(), + new ResourceContext(Resources.State.UNHEALTHY), + nodeSummary, + true); + + //System.out.println("NODE RCA: " + rfu); + } else { + rfu = new ResourceFlowUnit<>(System.currentTimeMillis()); + } + return rfu; + } + } + + public static class ClusterRca extends Rca> { + private final NodeRca nodeRca; + + public ClusterRca(NodeRca nodeRca) { + super(1); + this.nodeRca = nodeRca; + } + + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + throw new IllegalArgumentException(name() + "'s generateFlowUnitListFromWire() should not " + + "be required."); + } + + // The cluster level RCA goes through all the nodeLevel summaries and then picks the node + // with the highest CPU and states which shard it is the highest for. + @Override + public ResourceFlowUnit operate() { + final List> resourceFlowUnits = nodeRca.getFlowUnits(); + HotClusterSummary summary = new HotClusterSummary( + getAllClusterInstances().size(), 1); + + final InstanceDetails.Id defaultId = new InstanceDetails.Id("default-id"); + final InstanceDetails.Ip defaultIp = new InstanceDetails.Ip("1.1.1.1"); + + InstanceDetails.Id hotNodeId = defaultId; + InstanceDetails.Ip hotsNodeAddr = defaultIp; + String hotShard = ""; + String hotShardIndex = ""; + double cpuUtilization = 0.0; + + for (final ResourceFlowUnit resourceFlowUnit : resourceFlowUnits) { + if (resourceFlowUnit.isEmpty()) { + continue; + } + HotNodeSummary nodeSummary = resourceFlowUnit.getSummary(); + HotShardSummary hotShardSummary = nodeSummary.getHotShardSummaryList().get(0); + double cpu = hotShardSummary.getCpuUtilization(); + if (cpu > cpuUtilization) { + hotNodeId = nodeSummary.getNodeID(); + hotsNodeAddr = nodeSummary.getHostAddress(); + hotShard = hotShardSummary.getShardId(); + hotShardIndex = hotShardSummary.getIndexName(); + cpuUtilization = cpu; + } + } + + ResourceFlowUnit rfu; + if (!hotNodeId.equals(defaultId)) { + HotClusterSummary hotClusterSummary = new HotClusterSummary( + getAllClusterInstances().size(), 1); + HotNodeSummary hotNodeSummary = new HotNodeSummary(hotNodeId, hotsNodeAddr); + HotShardSummary hotShardSummary = + new HotShardSummary(hotShardIndex, hotShard, hotNodeId.toString(), 0); + hotShardSummary.setcpuUtilization(cpuUtilization); + hotNodeSummary.appendNestedSummary(hotShardSummary); + hotClusterSummary.appendNestedSummary(hotNodeSummary); + + rfu = new ResourceFlowUnit<>( + System.currentTimeMillis(), + new ResourceContext(Resources.State.UNHEALTHY), + hotClusterSummary, true); + } else { + rfu = new ResourceFlowUnit<>(System.currentTimeMillis()); + } + //System.out.println("CLUSTER RCA: " + rfu); + return rfu; + } + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java new file mode 100644 index 000000000..df0ec7930 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java @@ -0,0 +1,67 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.tests.poc.validator; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.integTests.framework.api.IValidator; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.util.List; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Assert; + +// Validators are only initialized once to evaluate a test method. +public class PocValidator implements IValidator { + long startTime; + + public PocValidator() { + startTime = System.currentTimeMillis(); + } + + /** + * {"rca_name":"ClusterRca", + * "timestamp":1596557050522, + * "state":"unhealthy", + * "HotClusterSummary":[ + * {"number_of_nodes":1,"number_of_unhealthy_nodes":1} + * ]} + */ + @Override + public boolean check(JsonElement response) { + JsonArray array = response.getAsJsonObject().get("data").getAsJsonArray(); + if (array.size() == 0) { + return false; + } + + for (int i = 0; i < array.size(); i++) { + JsonObject object = array.get(i).getAsJsonObject(); + if (object.get("rca_name").getAsString().equals("ClusterRca")) { + return checkClusterRca(object); + } + } + return false; + } + + /** + * {"rca_name":"ClusterRca", + * "timestamp":1597167456322, + * "state":"unhealthy", + * "HotClusterSummary":[{"number_of_nodes":1,"number_of_unhealthy_nodes":1}] + * } + */ + boolean checkClusterRca(JsonObject object) { + if (!"unhealthy".equals(object.get("state").getAsString())) { + return false; + } + + JsonElement elem = object.get("HotClusterSummary"); + if (elem == null) { + return false; + } + JsonArray array = elem.getAsJsonArray(); + + Assert.assertEquals(1, array.size()); + Assert.assertEquals(1, array.get(0).getAsJsonObject().get("number_of_unhealthy_nodes").getAsInt()); + + return true; + } +} diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index d488053bb..7ee494a74 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -2,7 +2,7 @@ - + diff --git a/src/test/resources/rca/rca_elected_master.conf b/src/test/resources/rca/rca_elected_master.conf index 7d5987680..a5bcf5285 100644 --- a/src/test/resources/rca/rca_elected_master.conf +++ b/src/test/resources/rca/rca_elected_master.conf @@ -40,7 +40,8 @@ // How often the sqlite file be repeated in seconds. This file contains RCAs and therefore rotating it too frequently // might not be as fruitful as there might not be any data. - "rotation-period-seconds": 21600 + "rotation-period-seconds": 21600, + "storage-file-retention-count": 5 }, "rca-config-settings": { diff --git a/src/test/resources/rca/rca_master.conf b/src/test/resources/rca/rca_master.conf index 298cb4439..320269e7d 100644 --- a/src/test/resources/rca/rca_master.conf +++ b/src/test/resources/rca/rca_master.conf @@ -40,7 +40,8 @@ // How often the sqlite file be repeated in seconds. This file contains RCAs and therefore rotating it too frequently // might not be as fruitful as there might not be any data. - "rotation-period-seconds": 21600 + "rotation-period-seconds": 21600, + "storage-file-retention-count": 5 }, "rca-config-settings": { diff --git a/src/test/resources/tmp/file_rotate/rca.test.file b/src/test/resources/tmp/file_rotate/rca.test.file deleted file mode 100644 index e9f2cc09d..000000000 Binary files a/src/test/resources/tmp/file_rotate/rca.test.file and /dev/null differ diff --git a/src/test/resources/tmp/file_rotate/rca.test.file.2020-07-20-11-33-38 b/src/test/resources/tmp/file_rotate/rca.test.file.2020-07-20-11-33-38 deleted file mode 100644 index e9f2cc09d..000000000 Binary files a/src/test/resources/tmp/file_rotate/rca.test.file.2020-07-20-11-33-38 and /dev/null differ