diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java index b3a791e4e..a9f37d8d9 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerApp.java @@ -144,8 +144,14 @@ appContext, new MetricsDBProvider() public static Thread startRcaTopLevelThread(final RcaController rcaController1, final ThreadProvider threadProvider) { + return startRcaTopLevelThread(rcaController1, threadProvider, ""); + } + + public static Thread startRcaTopLevelThread(final RcaController rcaController1, + final ThreadProvider threadProvider, + String nodeName) { Thread rcaControllerThread = threadProvider.createThreadForRunnable(() -> rcaController1.run(), - PerformanceAnalyzerThreads.RCA_CONTROLLER); + PerformanceAnalyzerThreads.RCA_CONTROLLER, nodeName); rcaControllerThread.start(); return rcaControllerThread; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java index 07c8aa6e6..8b2816ce5 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/RcaController.java @@ -64,6 +64,7 @@ import java.util.Objects; import java.util.Scanner; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -88,6 +89,8 @@ public class RcaController { private boolean rcaEnabledDefaultValue = false; + private final int WAIT_FOR_SCHED_START_SECS = 10; + // This needs to be volatile as the RcaConfPoller writes it but the Nanny reads it. private volatile boolean rcaEnabled = false; @@ -227,9 +230,17 @@ private void start() { new SubscribeServerHandler(subscriptionManager, networkThreadPoolReference)); Thread rcaSchedulerThread = threadProvider.createThreadForRunnable(() -> rcaScheduler.start(), - PerformanceAnalyzerThreads.RCA_SCHEDULER); + PerformanceAnalyzerThreads.RCA_SCHEDULER, + copyAppContext.getMyInstanceDetails().getInstanceId().toString()); + CountDownLatch schedulerStartLatch = new CountDownLatch(1); + rcaScheduler.setSchedulerTrackingLatch(schedulerStartLatch); rcaSchedulerThread.start(); + schedulerStartLatch.await(WAIT_FOR_SCHED_START_SECS, TimeUnit.SECONDS); + + if (rcaScheduler.getState() != RcaSchedulerState.STATE_STARTED) { + LOG.error("RCA scheduler didn't start within {} seconds", WAIT_FOR_SCHED_START_SECS); + } } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException @@ -311,7 +322,7 @@ public void run() { } tick++; } - LOG.info("RcaController exits.."); + LOG.error("RcaController exits.."); } /** diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java index a5cd5b967..a76a2c7ae 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java @@ -128,8 +128,6 @@ protected synchronized Path rotate(long currentMillis) throws IOException { Path ret; - LOG.info("About to rotate file: {} to {}", FILE_TO_ROTATE, targetFilePath); - // Fallback in rotating a file: // try 1. Rotate the file, don't try to replace the destination file if one exists. // try 2: Rotate the file now with replacement and add a log saying the destination file will be deleted. diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java index 79b9df4cd..b659aaa3c 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java @@ -84,8 +84,6 @@ enum RotationType { Path path = Paths.get(dir, filenameParam); fileRotate = new FileRotate(path, fileRotationTimeUnit, fileRotationPeriod, dateFormat); - - LOG.info("Force Rotating DB file during startup."); fileRotate.forceRotate(System.currentTimeMillis()); fileGC = new FileGC(Paths.get(dir), filenameParam, fileRotationTimeUnit, fileRotationPeriod, @@ -205,8 +203,8 @@ private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type // If we are here that means the tryRotate or the forceRotate didn't throw exception and therefore, // the current DBFile does not exist anymore. We therefore should create a new one. if (fileRotate.getLastRotatedMillis() == currTime) { - LOG.info("Periodic file rotation type: {}", type.toString()); openNewDBFile(); + LOG.info("Created a new DB file."); } } @@ -220,19 +218,19 @@ private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type * corrupted. * @throws IOException This is thrown if the attempt to create a new DB file fails. */ - private <T extends ResourceFlowUnit> void writeFlowUnit( + private synchronized <T extends ResourceFlowUnit> void writeFlowUnit( T flowUnit, String tableName) throws SQLException, IOException { try { tryWriteFlowUnit(flowUnit, tableName); } catch (SQLException | DataAccessException e) { LOG.info( - "RCA: Fail to write to table '{}', creating a new DB file and retrying write/create operation", tableName); + "RCA: Fail to write to table '{}', creating a new DB file and retrying write/create operation", tableName, e); rotateRegisterGarbageThenCreateNewDB(RotationType.FORCE_ROTATE); tryWriteFlowUnit(flowUnit, tableName); } } - private <T extends ResourceFlowUnit> void tryWriteFlowUnit( + private synchronized <T extends ResourceFlowUnit> void tryWriteFlowUnit( T flowUnit, String nodeName) throws SQLException, DataAccessException { String tableName = ResourceFlowUnit.RCA_TABLE_NAME; if (!tableNames.contains(tableName)) { @@ -251,14 +249,14 @@ private <T extends ResourceFlowUnit> void tryWriteFlowUnit( } /** recursively insert nested summary to sql tables */ - private void writeSummary( + private synchronized void writeSummary( GenericSummary summary, String referenceTable, String referenceTablePrimaryKeyFieldName, int referenceTablePrimaryKeyFieldValue) throws SQLException { String tableName = summary.getClass().getSimpleName(); if (!tableNames.contains(tableName)) { - LOG.info("RCA: Table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema()); + LOG.info("RCA: Summary table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema()); createTable(tableName, summary.getSqlSchema(), referenceTable, referenceTablePrimaryKeyFieldName); } List<Object> values = summary.getSqlValue(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java index 313f0412e..2f36a3ca4 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java @@ -33,6 +33,8 @@ import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -80,6 +82,9 @@ class SQLitePersistor extends PersistorBase { // It is needed during SQLite file rotation @Override synchronized void createNewDSLContext() { + if (create != null) { + create.close(); + } create = DSL.using(super.conn, SQLDialect.SQLITE); jooqTableColumns = new HashMap<>(); } @@ -201,13 +206,12 @@ public synchronized List<Result<Record>> getRecordsForAllTables() { } @Override - public Result<Record> getRecordsForTable(String tableName) { + public synchronized Result<Record> getRecordsForTable(String tableName) { return getRecords(tableName); } - @Override - public List<String> getAllPersistedRcas() { + public synchronized List<String> getAllPersistedRcas() { List<String> tables = new ArrayList<>(); try { tables = @@ -220,7 +224,6 @@ public List<String> getAllPersistedRcas() { return tables; } - //read table content and convert it into JSON format private synchronized String readTable(String tableName) { String tableStr; @@ -373,8 +376,10 @@ private synchronized JsonElement getNonTemperatureRcas(String rca) { } } } catch (DataAccessException de) { - // it is totally fine if we fail to read some certain tables. - LOG.warn("Fail to read RCA : {}, query = {}, exceptions : {}", rca, rcaQuery.toString(), de); + if (!de.getMessage().contains("no such table")) { + // it is totally fine if we fail to read some certain tables. + LOG.warn("Fail to read RCA : {}.", rca, de); + } } JsonElement ret = null; if (response != null) { diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java index 666e71ab5..77b9bf576 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/scheduler/RCAScheduler.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -72,6 +73,8 @@ public class RCAScheduler { private static final Logger LOG = LogManager.getLogger(RCAScheduler.class); + private CountDownLatch schedulerTrackingLatch; + public RCAScheduler( List<ConnectedComponent> connectedComponents, Queryable db, @@ -111,10 +114,16 @@ public void start() { if (scheduledPool == null) { LOG.error("Couldn't start RCA scheduler. Executor pool is not set."); + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } return; } if (role == NodeRole.UNKNOWN) { LOG.error("Couldn't start RCA scheduler as the node role is UNKNOWN."); + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } return; } @@ -130,6 +139,9 @@ public void start() { schedulerState = RcaSchedulerState.STATE_STARTED; LOG.info("RCA scheduler thread started successfully on node: {}", appContext.getMyInstanceDetails().getInstanceId()); + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } while (schedulerState == RcaSchedulerState.STATE_STARTED) { try { @@ -159,6 +171,7 @@ public void shutdown() { LOG.info("Shutting down the scheduler.."); shutdownRequested = true; scheduledPool.shutdown(); + waitForShutdown(scheduledPool); rcaSchedulerPeriodicExecutor.shutdown(); waitForShutdown(rcaSchedulerPeriodicExecutor); try { @@ -168,6 +181,9 @@ public void shutdown() { "RCA: Error while closing the DB connection: {}::{}", e.getErrorCode(), e.getCause()); } schedulerState = RcaSchedulerState.STATE_STOPPED; + if (schedulerTrackingLatch != null) { + schedulerTrackingLatch.countDown(); + } } private void waitForShutdown(ExecutorService execPool) { @@ -194,6 +210,10 @@ public NodeRole getRole() { return role; } + public void setSchedulerTrackingLatch(final CountDownLatch schedulerTrackingLatch) { + this.schedulerTrackingLatch = schedulerTrackingLatch; + } + @VisibleForTesting public void setQueryable(Queryable queryable) { this.db = queryable; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java index 0cb36181b..21e3b20aa 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/threads/ThreadProvider.java @@ -41,7 +41,13 @@ public class ThreadProvider { * @return The thread with the wrapped runnable. */ public Thread createThreadForRunnable(final Runnable innerRunnable, - final PerformanceAnalyzerThreads paThread) { + final PerformanceAnalyzerThreads paThread, String threadNameAppender) { + StringBuilder threadName = new StringBuilder(paThread.toString()); + if (!threadNameAppender.isEmpty()) { + threadName.append("-").append(threadNameAppender); + } + String threadNameStr = threadName.toString(); + Thread t = new Thread(() -> { try { innerRunnable.run(); @@ -56,11 +62,16 @@ public Thread createThreadForRunnable(final Runnable innerRunnable, } } StatsCollector.instance().logMetric(PA_THREADS_ENDED_METRIC_NAME); - LOG.info("Thread: {} completed.", paThread.toString()); - }, paThread.toString()); + LOG.info("Thread: {} completed.", threadNameStr); + }, threadNameStr); - LOG.info("Spun up a thread with name: {}", paThread.toString()); + LOG.info("Spun up a thread with name: {}", threadNameStr); StatsCollector.instance().logMetric(PA_THREADS_STARTED_METRIC_NAME); return t; } + + public Thread createThreadForRunnable(final Runnable innerRunnable, + final PerformanceAnalyzerThreads paThread) { + return createThreadForRunnable(innerRunnable, paThread, ""); + } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java index 95ea2dab6..e60bd7173 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Cluster.java @@ -95,7 +95,9 @@ public Cluster(final ClusterType type, final File clusterDir, final boolean useH this.hostList = new ArrayList<>(); this.roleToHostMap = new HashMap<>(); this.clusterDir = clusterDir; - this.rcaEnabled = true; + // We start off with the RCA turned off and turn it on only right before we + // invoke a test method. + this.rcaEnabled = false; this.useHttps = useHttps; this.threadProvider = new ThreadProvider(); this.exceptionQueue = new ArrayBlockingQueue<>(1); @@ -186,7 +188,7 @@ private HostTag getTagForHostIdForHostTagAssignment(int hostId) { throw new IllegalStateException("No cluster type matches"); } - public void createServersAndThreads() throws Exception { + public void createServersAndThreads() { this.errorHandlingThread = PerformanceAnalyzerApp.startErrorHandlingThread(threadProvider, exceptionQueue); for (Host host : hostList) { host.createServersAndThreads(threadProvider); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java index bf26d0765..46dad2540 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/framework/Host.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -139,14 +140,14 @@ private static File createHostDir(File clusterDir, HostTag hostTag) { return hostFile; } - public void createServersAndThreads(final ThreadProvider threadProvider) throws Exception { + public void createServersAndThreads(final ThreadProvider threadProvider) { this.threadProvider = threadProvider; Objects.requireNonNull(appContext.getClusterDetailsEventProcessor(), "ClusterDetailsEventProcessor cannot be null in the AppContext"); rcaEnabledFile = Paths.get(hostDir.getAbsolutePath(), RcaController.RCA_ENABLED_CONF_FILE); RcaSchedulerState state = rcaEnabled ? RcaSchedulerState.STATE_STARTED : RcaSchedulerState.STATE_STOPPED; - setRcaState(state); + setExpectedRcaState(state); this.connectionManager = new GRPCConnectionManager(useHttps); this.clientServers = PerformanceAnalyzerApp.createClientServers(connectionManager, @@ -179,7 +180,7 @@ public void createServersAndThreads(final ThreadProvider threadProvider) throws } // We create a temporary file and then swap it for the rca.enabled file. - public void setRcaState(RcaSchedulerState rcaState) { + public void setExpectedRcaState(RcaSchedulerState rcaState) { Path rcaEnabledTmp = Paths.get(rcaEnabledFile + ".tmp"); try (FileWriter f2 = new FileWriter(rcaEnabledTmp.toFile(), false /*To create a new file*/)) { boolean value = true; @@ -282,19 +283,29 @@ public void deleteHostDir() throws IOException { } public void stopRcaScheduler() throws Exception { - setRcaState(RcaSchedulerState.STATE_STOPPED); - rcaController.waitForRcaState(RcaSchedulerState.STATE_STOPPED); - LOG.info("RCA Scheduler STOPPED"); + RCAScheduler sched = rcaController.getRcaScheduler(); + CountDownLatch shutdownLatch = null; + if (sched != null) { + shutdownLatch = new CountDownLatch(1); + sched.setSchedulerTrackingLatch(shutdownLatch); + } + setExpectedRcaState(RcaSchedulerState.STATE_STOPPED); + if (shutdownLatch != null) { + shutdownLatch.await(10, TimeUnit.SECONDS); + } + LOG.info("RCA Scheduler is STOPPED by TestRunner on node: {}", myTag); } public void startRcaControllerThread() { - this.rcaControllerThread = PerformanceAnalyzerApp.startRcaTopLevelThread(rcaController, threadProvider); + this.rcaControllerThread = PerformanceAnalyzerApp.startRcaTopLevelThread( + rcaController, + threadProvider, + appContext.getMyInstanceDetails().getInstanceId().toString()); } public void startRcaScheduler() throws Exception { - setRcaState(RcaSchedulerState.STATE_STARTED); + setExpectedRcaState(RcaSchedulerState.STATE_STARTED); rcaController.waitForRcaState(RcaSchedulerState.STATE_STARTED); - LOG.info("RCA scheduler STARTED successfully on host: {}.", myTag); } public void updateRcaGraph(final Class rcaGraphClass) diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPoc.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocCoLocatedMaster.java similarity index 98% rename from src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPoc.java rename to src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocCoLocatedMaster.java index 88b1c642f..632829a90 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPoc.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/RcaItPocCoLocatedMaster.java @@ -49,7 +49,7 @@ ) } ) -public class RcaItPoc { +public class RcaItPocCoLocatedMaster { private TestApi api; @Test diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java index ebb93165c..ff83d7675 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/integTests/tests/poc/validator/PocValidator.java @@ -54,7 +54,11 @@ boolean checkClusterRca(JsonObject object) { return false; } - JsonArray array = object.get("HotClusterSummary").getAsJsonArray(); + JsonElement elem = object.get("HotClusterSummary"); + if (elem == null) { + return false; + } + JsonArray array = elem.getAsJsonArray(); Assert.assertEquals(1, array.size()); Assert.assertEquals(1, array.get(0).getAsJsonObject().get("number_of_unhealthy_nodes").getAsInt()); diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index d488053bb..7ee494a74 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -2,7 +2,7 @@ <Configuration status="INFO"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{1}:%M:%L - %msg%n" /> </Console> <File name="PerformanceAnalyzerLog" fileName="${log4j:configParentLocation}/log/PerformanceAnalyzer.log" immediateFlush="true" append="true"> <PatternLayout pattern="%d{yyy-MM-dd HH:mm:ss.SSS} [PA:Reader] [%t] %-5level %logger{36} - %msg%n"/>