Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Integration test framework to test RCAs and decision Makers #301

Merged
merged 6 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ e.g.
./gradlew test --tests RCASchedulerTaskTests.getLocallyExecutableNodes
```

### Adding your own tests
- You can add a unit tests using the junit framework
- There is also a mechanism to add integration tests for the RCA framework. For details, please
see [here](docs/rcait.md).

Before submitting the PR, we request that you also run
```shell script
./gradlew build
Expand Down
97 changes: 97 additions & 0 deletions docs/rcait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Rca Integration test framework

## Scope
To be able to test scenarios where multiple RCA Schedulers are running on different hosts of a
cluster and be able to inject a custom RCA graph, specify the metrics that should flow through
the rca-dataflow-graph and then be able to test the results of the RCAs and decisions based on
the RCAs by either querying the rca.sqlite files on a particular host or by hitting the REST
endpoint on a particular host.

## Out of Scope
This framework will not facilitate testing the PerformanceAnalyzer Reader component, writer
component and how PerformanceAnalyzer interacts with ElasticSearch.

## How to write your own tests using this framework ?
The RCA-IT is composed of various annotatons that you can use to configure the
test environment you want your tests to run on.

`__@RunWith(RcaItNotEncryptedRunner.class)__`

The above specifies the runner for the junit test class and in this case, it says to junit
to offload it to one of the RCA-IT runners - _RcaItNotEncryptedRunner_. All RCA-IT tests must
use this annotation for them to be run by this integ test framework.

`__@AClusterType(ClusterType.MULTI_NODE_CO_LOCATED_MASTER)__`

This annotation tells the RCA-IT to use `a multi-node cluster with no dedicated master nodes
`. The kinds of clusters supported today are: `SINGLE_NODE`, `MULTI_NODE_CO_LOCATED_MASTER
` and `MULTI_NODE_DEDICATED_MASTER`. This is a required annotation and must be specified at
the class level.

`__@ARcaGraph(MyRcaGraph.class)__`

This helps us specify the Graph that we will be using for this test. It can be a graph that
exists or the one specially contrived for this test.

`__@AMetric__`

This helps us specify the metrics that will be pured over the RCA graph. It has multiple sub
-fields.
- _name_ : The metric we are filling in. The expected parameter is one of the metrics classes
in `com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics`. The
metrics class that you specify, should have a `static final` field called `NAME` (`CPU_Utilization`)
and that will be used to determine the metric table.
- _dimensionNames_ : For the dimension names for a metric, please refer to the docs
[here](https://opendistro.github.io/for-elasticsearch-docs/docs/pa/reference/).
- _tables_ : This specifies a table for the metric. The table should be a 5 second snapshot
of the metrics, similar to what exists in metricsdb files. The table is an array type
, therfore it gives you the flexibility of specifying a different metrics table for
different nodes in the cluster. This can be used to push different metrics to the node that
we want to be marked unhealthy vs all other nodes in the cluster.
- _hostTag_ : On which node of the cluster, should this metric table be emitted.
- _tuple_ : This is an array type that can be used to specify the rows in the table. A
row should be an n-tuple where n is the number of dimension this metrics has added to
the 4 aggregate columns that all metricsdb files has - `min`, `max`, `sum` and `avg`.

`__@Expect__`

This is an optional annotation that can be used only at a method level. This provides an easy
way to validate the result of the test. The annotation has 4 sub-fields:
- what : What are we testing for - data in rca.sqlite file or the response returned by the
rest API.
- on : On which node should the framework look for, for the expected data.
- forRca : Which particular RCA's value are we testing.
- validator : This is the class that should be defined by the test writer and should
implement `IValidator` interface. Once the framework gathers the data for the mentioned RCA
from the given node, the data will be passed to the validator which returns if the check
passed or not.

The Expect annotation is a repeatable type. Therefore, you can expect multiple things from
the test at steady-state. So you can have two expectations one for the RCA on data node and
the other on the master. If the expectations are not true for the ith iteration, then the
framework, will re-run them for the i+1 the iteration till a timeout. The timeout is
configurable to any number of seconds using the field `timeoutSeconds` but has the default
of 60 seconds.

A test class can get access to the programmaticAPI to get information about hosts in the cluster
or a particular host then the test class can declare a method with name `setTestApi(final TestApi api)`
and the test runer will call this setter to give a reference of the TEestApi to the testClass.

## Framework deep dive.
This section might be of interest to you if you are trying to enhance the test framework itself
. If you just want to add more integration tests, then you may choose to skip this section.

The framework consists of four main classes:
1. `RcaItRunnerBase` : This is the JUnit Runner that will be used to run all rca-it tests. It
orchestrates the environment creation, initializing the test class and then executing the methods
annotated with `@Test`.

2. `TestEnvironment` : The RCA-IT environment is defined by the RCA graph it will be running, the
metrics that will flow through the dataflow pipelines and the rca.conf that will be used by the
hosts.

3. `Cluster` and `Host` classes: These class initializes multiple RCAController(s) threads,
each of them represent RCAFramework running on multiple nodes. In constructors, we create all the
objects and create a directory per host where they will dump the _rca.sqlite_ and _rca_enabled_
files. In the second phase a call to `createServersAndThreads` is made which creates all the http
and grpc servers (one per host). Then we start the RCAcontroller thread.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetClient;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.JvmMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
Expand Down Expand Up @@ -136,14 +137,21 @@ private static void startRcaTopLevelThread(final ClientServers clientServers,
Util.DATA_DIR,
RcaConsts.RCA_STATE_CHECK_INTERVAL_IN_MS,
RcaConsts.nodeRolePollerPeriodicityInSeconds * 1000,
appContext
appContext, new MetricsDBProvider()
);
startRcaTopLevelThread(rcaController, threadProvider);
}

public static Thread startRcaTopLevelThread(final RcaController rcaController1, final ThreadProvider threadProvider) {
public static Thread startRcaTopLevelThread(final RcaController rcaController1,
final ThreadProvider threadProvider) {
return startRcaTopLevelThread(rcaController1, threadProvider, "");
}

public static Thread startRcaTopLevelThread(final RcaController rcaController1,
final ThreadProvider threadProvider,
String nodeName) {
Thread rcaControllerThread = threadProvider.createThreadForRunnable(() -> rcaController1.run(),
PerformanceAnalyzerThreads.RCA_CONTROLLER);
PerformanceAnalyzerThreads.RCA_CONTROLLER, nodeName);
rcaControllerThread.start();
return rcaControllerThread;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static HttpServer createInternalServer(String portFromSetting, String hos
server.setExecutor(Executors.newCachedThreadPool());
return server;
} catch (java.net.BindException ex) {
LOG.error("Could not create HttpServer on port {}", internalPort, ex);
Runtime.getRuntime().halt(1);
} catch (Exception ex) {
ex.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -88,6 +89,8 @@ public class RcaController {

private boolean rcaEnabledDefaultValue = false;

private final int WAIT_FOR_SCHED_START_SECS = 10;

// This needs to be volatile as the RcaConfPoller writes it but the Nanny reads it.
private volatile boolean rcaEnabled = false;

Expand All @@ -96,6 +99,7 @@ public class RcaController {

// This needs to be volatile as the NodeRolePoller writes it but the Nanny reads it.
protected volatile NodeRole currentRole = NodeRole.UNKNOWN;
private volatile List<ConnectedComponent> connectedComponents;

private final ThreadProvider threadProvider;
private RCAScheduler rcaScheduler;
Expand Down Expand Up @@ -124,7 +128,9 @@ public class RcaController {

private final AppContext appContext;

protected Queryable dbProvider = null;
protected volatile Queryable dbProvider = null;

private volatile Persistable persistenceProvider;

public RcaController(
final ThreadProvider threadProvider,
Expand All @@ -134,7 +140,8 @@ public RcaController(
final String rca_enabled_conf_location,
final long rcaStateCheckIntervalMillis,
final long nodeRoleCheckPeriodicityMillis,
final AppContext appContext) {
final AppContext appContext,
final Queryable dbProvider) {
this.threadProvider = threadProvider;
this.appContext = appContext;
this.netOpsExecutorService = netOpsExecutorService;
Expand All @@ -151,6 +158,9 @@ public RcaController(
this.rcaStateCheckIntervalMillis = rcaStateCheckIntervalMillis;
this.roleCheckPeriodicity = nodeRoleCheckPeriodicityMillis;
this.deliberateInterrupt = false;
this.connectedComponents = null;
this.dbProvider = dbProvider;
this.persistenceProvider = null;
}

@VisibleForTesting
Expand All @@ -162,6 +172,7 @@ public RcaController() {
rcaStateCheckIntervalMillis = 0;
roleCheckPeriodicity = 0;
appContext = null;
this.persistenceProvider = null;
}

protected List<ConnectedComponent> getRcaGraphComponents(
Expand All @@ -177,25 +188,22 @@ private void start() {
try {
Objects.requireNonNull(subscriptionManager);
Objects.requireNonNull(rcaConf);
if (dbProvider == null) {
return;
}

subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
List<ConnectedComponent> connectedComponents = getRcaGraphComponents(rcaConf);
this.connectedComponents = getRcaGraphComponents(rcaConf);

// Mute the rca nodes after the graph creation and before the scheduler start
readAndUpdateMutedComponentsDuringStart();

Queryable db;
if (dbProvider == null) {
db = new MetricsDBProvider();
} else {
db = dbProvider;
}
ThresholdMain thresholdMain = new ThresholdMain(RcaConsts.THRESHOLDS_PATH, rcaConf);
Persistable persistable = PersistenceFactory.create(rcaConf);
persistenceProvider = PersistenceFactory.create(rcaConf);
networkThreadPoolReference
.set(RcaControllerHelper.buildNetworkThreadPool(rcaConf.getNetworkQueueLength()));
addRcaRequestHandler();
queryRcaRequestHandler.setPersistable(persistable);
queryRcaRequestHandler.setPersistable(persistenceProvider);
receivedFlowUnitStore = new ReceivedFlowUnitStore(rcaConf.getPerVertexBufferLength());
WireHopper net =
new WireHopper(nodeStateManager, rcaNetClient, subscriptionManager,
Expand All @@ -208,10 +216,10 @@ private void start() {

this.rcaScheduler =
new RCAScheduler(connectedComponents,
db,
dbProvider,
rcaConf,
thresholdMain,
persistable,
persistenceProvider,
net,
copyAppContext);

Expand All @@ -221,9 +229,21 @@ private void start() {
new SubscribeServerHandler(subscriptionManager, networkThreadPoolReference));

Thread rcaSchedulerThread = threadProvider.createThreadForRunnable(() -> rcaScheduler.start(),
PerformanceAnalyzerThreads.RCA_SCHEDULER);
PerformanceAnalyzerThreads.RCA_SCHEDULER,
copyAppContext.getMyInstanceDetails().getInstanceId().toString());

CountDownLatch schedulerStartLatch = new CountDownLatch(1);
rcaScheduler.setSchedulerTrackingLatch(schedulerStartLatch);
rcaSchedulerThread.start();
if (!schedulerStartLatch.await(WAIT_FOR_SCHED_START_SECS, TimeUnit.SECONDS)) {
LOG.error("Failed to start RcaScheduler.");
throw new IllegalStateException(
"Failed to start RcaScheduler within " + WAIT_FOR_SCHED_START_SECS + " seconds.");
}

if (rcaScheduler.getState() != RcaSchedulerState.STATE_STARTED) {
LOG.error("RCA scheduler didn't start within {} seconds", WAIT_FOR_SCHED_START_SECS);
}
Comment on lines +244 to +246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't throw after this?

} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
Expand All @@ -232,8 +252,9 @@ private void start() {
| MalformedConfig
| SQLException
| IOException e) {
LOG.error("Couldn't build connected components or persistable.. Ran into {}", e.getMessage());
e.printStackTrace();
LOG.error("Couldn't build connected components or persistable..", e);
} catch (Exception ex) {
LOG.error("Couldn't start RcaController", ex);
}
}

Expand Down Expand Up @@ -262,6 +283,10 @@ private void restart() {
StatsCollector.instance().logMetric(RcaConsts.RCA_SCHEDULER_RESTART_METRIC);
}

protected RcaConf getRcaConfForMyRole(NodeRole role) {
return RcaControllerHelper.pickRcaConfForRole(role);
}

public void run() {
long tick = 0;
long nodeRoleCheckInTicks = roleCheckPeriodicity / rcaStateCheckIntervalMillis;
Expand All @@ -279,7 +304,7 @@ public void run() {

// If RCA is enabled, update Analysis graph with Muted RCAs value
if (rcaEnabled) {
rcaConf = RcaControllerHelper.pickRcaConfForRole(currentRole);
rcaConf = getRcaConfForMyRole(currentRole);
LOG.debug("Updating Analysis Graph with Muted RCAs");
readAndUpdateMutedComponents();
}
Expand All @@ -290,14 +315,17 @@ public void run() {
Thread.sleep(rcaStateCheckIntervalMillis - duration);
}
} catch (InterruptedException ie) {
if (!deliberateInterrupt) {
LOG.error("RCA controller thread was interrupted. Reason: {}", ie.getMessage());
LOG.error(ie);
if (deliberateInterrupt) {
// This should only happen in case of tests. So, its okay for this log level to be info.
LOG.info("RcaController thread interrupted..");
} else {
LOG.error("RCA controller thread was interrupted.", ie);
}
break;
}
tick++;
}
LOG.error("RcaController exits..");
}

/**
Expand All @@ -310,7 +338,12 @@ private void readRcaEnabledFromConf() {
() -> {
try (Scanner sc = new Scanner(filePath)) {
String nextLine = sc.nextLine();
rcaEnabled = Boolean.parseBoolean(nextLine);
boolean oldVal = rcaEnabled;
boolean newVal = Boolean.parseBoolean(nextLine);
if (oldVal != newVal) {
rcaEnabled = newVal;
LOG.info("RCA enabled changed from {} to {}", oldVal, newVal);
}
} catch (IOException e) {
LOG.error("Error reading file '{}': {}", filePath.toString(), e);
e.printStackTrace();
Expand All @@ -336,7 +369,8 @@ private void readAndUpdateMutedComponentsDuringStart() {

private boolean updateMutedComponents() {
try {
if (ConnectedComponent.getNodeNames().isEmpty()) {
Set<String> allNodes = ConnectedComponent.getNodesForAllComponents(this.connectedComponents);
if (allNodes.isEmpty()) {
LOG.info("Analysis graph not initialized/has been reset; returning.");
return false;
}
Expand All @@ -350,7 +384,7 @@ private boolean updateMutedComponents() {
LOG.info("Actions provided for muting: {}", actionsForMute);

// Update rcasForMute to retain only valid RCAs
graphNodesForMute.retainAll(ConnectedComponent.getNodeNames());
graphNodesForMute.retainAll(allNodes);

// If rcasForMute post validation is empty but neither rcaConf.getMutedRcaList() nor
// rcaConf.getMutedDeciderList() are empty all the input RCAs/deciders are incorrect.
Expand All @@ -359,11 +393,11 @@ private boolean updateMutedComponents() {
if (lastModifiedTimeInMillisInMemory == 0) {
LOG.error(
"Removing Incorrect RCA(s): {} provided before RCA Scheduler start. Valid RCAs: {}.",
rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames());
rcaConf.getMutedRcaList(), allNodes);

} else {
LOG.error("Incorrect RCA(s): {}, cannot be muted. Valid RCAs: {}, Muted RCAs: {}",
rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames(),
rcaConf.getMutedRcaList(), allNodes,
Stats.getInstance().getMutedGraphNodes());
return false;
}
Expand Down Expand Up @@ -480,4 +514,13 @@ public void setDbProvider(Queryable dbProvider) {
this.dbProvider = dbProvider;
}

@VisibleForTesting
public List<ConnectedComponent> getConnectedComponents() {
return connectedComponents;
}

@VisibleForTesting
public Persistable getPersistenceProvider() {
return persistenceProvider;
}
}
Loading