Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fixed a bug where the database was being corrupted because of multipl…
Browse files Browse the repository at this point in the history
…e scheduler initializations.
  • Loading branch information
yojs committed Aug 7, 2020
1 parent d526cf8 commit d5f98c1
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,14 @@ appContext, new MetricsDBProvider()

public static Thread startRcaTopLevelThread(final RcaController rcaController1,
final ThreadProvider threadProvider) {
return startRcaTopLevelThread(rcaController1, threadProvider, "");
}

public static Thread startRcaTopLevelThread(final RcaController rcaController1,
final ThreadProvider threadProvider,
String nodeName) {
Thread rcaControllerThread = threadProvider.createThreadForRunnable(() -> rcaController1.run(),
PerformanceAnalyzerThreads.RCA_CONTROLLER);
PerformanceAnalyzerThreads.RCA_CONTROLLER, nodeName);
rcaControllerThread.start();
return rcaControllerThread;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -88,6 +89,8 @@ public class RcaController {

private boolean rcaEnabledDefaultValue = false;

private final int WAIT_FOR_SCHED_START_SECS = 10;

// This needs to be volatile as the RcaConfPoller writes it but the Nanny reads it.
private volatile boolean rcaEnabled = false;

Expand Down Expand Up @@ -227,9 +230,17 @@ private void start() {
new SubscribeServerHandler(subscriptionManager, networkThreadPoolReference));

Thread rcaSchedulerThread = threadProvider.createThreadForRunnable(() -> rcaScheduler.start(),
PerformanceAnalyzerThreads.RCA_SCHEDULER);
PerformanceAnalyzerThreads.RCA_SCHEDULER,
copyAppContext.getMyInstanceDetails().getInstanceId().toString());

CountDownLatch schedulerStartLatch = new CountDownLatch(1);
rcaScheduler.setSchedulerTrackingLatch(schedulerStartLatch);
rcaSchedulerThread.start();
schedulerStartLatch.await(WAIT_FOR_SCHED_START_SECS, TimeUnit.SECONDS);

if (rcaScheduler.getState() != RcaSchedulerState.STATE_STARTED) {
LOG.error("RCA scheduler didn't start within {} seconds", WAIT_FOR_SCHED_START_SECS);
}
} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
Expand Down Expand Up @@ -311,7 +322,7 @@ public void run() {
}
tick++;
}
LOG.info("RcaController exits..");
LOG.error("RcaController exits..");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ protected synchronized Path rotate(long currentMillis) throws IOException {

Path ret;

LOG.info("About to rotate file: {} to {}", FILE_TO_ROTATE, targetFilePath);

// Fallback in rotating a file:
// try 1. Rotate the file, don't try to replace the destination file if one exists.
// try 2: Rotate the file now with replacement and add a log saying the destination file will be deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ enum RotationType {

Path path = Paths.get(dir, filenameParam);
fileRotate = new FileRotate(path, fileRotationTimeUnit, fileRotationPeriod, dateFormat);

LOG.info("Force Rotating DB file during startup.");
fileRotate.forceRotate(System.currentTimeMillis());

fileGC = new FileGC(Paths.get(dir), filenameParam, fileRotationTimeUnit, fileRotationPeriod,
Expand Down Expand Up @@ -205,8 +203,8 @@ private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type
// If we are here that means the tryRotate or the forceRotate didn't throw exception and therefore,
// the current DBFile does not exist anymore. We therefore should create a new one.
if (fileRotate.getLastRotatedMillis() == currTime) {
LOG.info("Periodic file rotation type: {}", type.toString());
openNewDBFile();
LOG.info("Created a new DB file.");
}
}

Expand All @@ -220,19 +218,19 @@ private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type
* corrupted.
* @throws IOException This is thrown if the attempt to create a new DB file fails.
*/
private <T extends ResourceFlowUnit> void writeFlowUnit(
private synchronized <T extends ResourceFlowUnit> void writeFlowUnit(
T flowUnit, String tableName) throws SQLException, IOException {
try {
tryWriteFlowUnit(flowUnit, tableName);
} catch (SQLException | DataAccessException e) {
LOG.info(
"RCA: Fail to write to table '{}', creating a new DB file and retrying write/create operation", tableName);
"RCA: Fail to write to table '{}', creating a new DB file and retrying write/create operation", tableName, e);
rotateRegisterGarbageThenCreateNewDB(RotationType.FORCE_ROTATE);
tryWriteFlowUnit(flowUnit, tableName);
}
}

private <T extends ResourceFlowUnit> void tryWriteFlowUnit(
private synchronized <T extends ResourceFlowUnit> void tryWriteFlowUnit(
T flowUnit, String nodeName) throws SQLException, DataAccessException {
String tableName = ResourceFlowUnit.RCA_TABLE_NAME;
if (!tableNames.contains(tableName)) {
Expand All @@ -251,14 +249,14 @@ private <T extends ResourceFlowUnit> void tryWriteFlowUnit(
}

/** recursively insert nested summary to sql tables */
private void writeSummary(
private synchronized void writeSummary(
GenericSummary summary,
String referenceTable,
String referenceTablePrimaryKeyFieldName,
int referenceTablePrimaryKeyFieldValue) throws SQLException {
String tableName = summary.getClass().getSimpleName();
if (!tableNames.contains(tableName)) {
LOG.info("RCA: Table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema());
LOG.info("RCA: Summary table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema());
createTable(tableName, summary.getSqlSchema(), referenceTable, referenceTablePrimaryKeyFieldName);
}
List<Object> values = summary.getSqlValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -80,6 +82,9 @@ class SQLitePersistor extends PersistorBase {
// It is needed during SQLite file rotation
@Override
synchronized void createNewDSLContext() {
if (create != null) {
create.close();
}
create = DSL.using(super.conn, SQLDialect.SQLITE);
jooqTableColumns = new HashMap<>();
}
Expand Down Expand Up @@ -201,13 +206,12 @@ public synchronized List<Result<Record>> getRecordsForAllTables() {
}

@Override
public Result<Record> getRecordsForTable(String tableName) {
public synchronized Result<Record> getRecordsForTable(String tableName) {
return getRecords(tableName);
}


@Override
public List<String> getAllPersistedRcas() {
public synchronized List<String> getAllPersistedRcas() {
List<String> tables = new ArrayList<>();
try {
tables =
Expand All @@ -220,7 +224,6 @@ public List<String> getAllPersistedRcas() {
return tables;
}


//read table content and convert it into JSON format
private synchronized String readTable(String tableName) {
String tableStr;
Expand Down Expand Up @@ -373,8 +376,10 @@ private synchronized JsonElement getNonTemperatureRcas(String rca) {
}
}
} catch (DataAccessException de) {
// it is totally fine if we fail to read some certain tables.
LOG.warn("Fail to read RCA : {}, query = {}, exceptions : {}", rca, rcaQuery.toString(), de);
if (!de.getMessage().contains("no such table")) {
// it is totally fine if we fail to read some certain tables.
LOG.warn("Fail to read RCA : {}.", rca, de);
}
}
JsonElement ret = null;
if (response != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class RCAScheduler {

private static final Logger LOG = LogManager.getLogger(RCAScheduler.class);

private CountDownLatch schedulerTrackingLatch;

public RCAScheduler(
List<ConnectedComponent> connectedComponents,
Queryable db,
Expand Down Expand Up @@ -111,10 +114,16 @@ public void start() {

if (scheduledPool == null) {
LOG.error("Couldn't start RCA scheduler. Executor pool is not set.");
if (schedulerTrackingLatch != null) {
schedulerTrackingLatch.countDown();
}
return;
}
if (role == NodeRole.UNKNOWN) {
LOG.error("Couldn't start RCA scheduler as the node role is UNKNOWN.");
if (schedulerTrackingLatch != null) {
schedulerTrackingLatch.countDown();
}
return;
}

Expand All @@ -130,6 +139,9 @@ public void start() {

schedulerState = RcaSchedulerState.STATE_STARTED;
LOG.info("RCA scheduler thread started successfully on node: {}", appContext.getMyInstanceDetails().getInstanceId());
if (schedulerTrackingLatch != null) {
schedulerTrackingLatch.countDown();
}

while (schedulerState == RcaSchedulerState.STATE_STARTED) {
try {
Expand Down Expand Up @@ -159,6 +171,7 @@ public void shutdown() {
LOG.info("Shutting down the scheduler..");
shutdownRequested = true;
scheduledPool.shutdown();
waitForShutdown(scheduledPool);
rcaSchedulerPeriodicExecutor.shutdown();
waitForShutdown(rcaSchedulerPeriodicExecutor);
try {
Expand All @@ -168,6 +181,9 @@ public void shutdown() {
"RCA: Error while closing the DB connection: {}::{}", e.getErrorCode(), e.getCause());
}
schedulerState = RcaSchedulerState.STATE_STOPPED;
if (schedulerTrackingLatch != null) {
schedulerTrackingLatch.countDown();
}
}

private void waitForShutdown(ExecutorService execPool) {
Expand All @@ -194,6 +210,10 @@ public NodeRole getRole() {
return role;
}

public void setSchedulerTrackingLatch(final CountDownLatch schedulerTrackingLatch) {
this.schedulerTrackingLatch = schedulerTrackingLatch;
}

@VisibleForTesting
public void setQueryable(Queryable queryable) {
this.db = queryable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ public class ThreadProvider {
* @return The thread with the wrapped runnable.
*/
public Thread createThreadForRunnable(final Runnable innerRunnable,
final PerformanceAnalyzerThreads paThread) {
final PerformanceAnalyzerThreads paThread, String threadNameAppender) {
StringBuilder threadName = new StringBuilder(paThread.toString());
if (!threadNameAppender.isEmpty()) {
threadName.append("-").append(threadNameAppender);
}
String threadNameStr = threadName.toString();

Thread t = new Thread(() -> {
try {
innerRunnable.run();
Expand All @@ -56,11 +62,16 @@ public Thread createThreadForRunnable(final Runnable innerRunnable,
}
}
StatsCollector.instance().logMetric(PA_THREADS_ENDED_METRIC_NAME);
LOG.info("Thread: {} completed.", paThread.toString());
}, paThread.toString());
LOG.info("Thread: {} completed.", threadNameStr);
}, threadNameStr);

LOG.info("Spun up a thread with name: {}", paThread.toString());
LOG.info("Spun up a thread with name: {}", threadNameStr);
StatsCollector.instance().logMetric(PA_THREADS_STARTED_METRIC_NAME);
return t;
}

public Thread createThreadForRunnable(final Runnable innerRunnable,
final PerformanceAnalyzerThreads paThread) {
return createThreadForRunnable(innerRunnable, paThread, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public Cluster(final ClusterType type, final File clusterDir, final boolean useH
this.hostList = new ArrayList<>();
this.roleToHostMap = new HashMap<>();
this.clusterDir = clusterDir;
this.rcaEnabled = true;
// We start off with the RCA turned off and turn it on only right before we
// invoke a test method.
this.rcaEnabled = false;
this.useHttps = useHttps;
this.threadProvider = new ThreadProvider();
this.exceptionQueue = new ArrayBlockingQueue<>(1);
Expand Down Expand Up @@ -186,7 +188,7 @@ private HostTag getTagForHostIdForHostTagAssignment(int hostId) {
throw new IllegalStateException("No cluster type matches");
}

public void createServersAndThreads() throws Exception {
public void createServersAndThreads() {
this.errorHandlingThread = PerformanceAnalyzerApp.startErrorHandlingThread(threadProvider, exceptionQueue);
for (Host host : hostList) {
host.createServersAndThreads(threadProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -139,14 +140,14 @@ private static File createHostDir(File clusterDir, HostTag hostTag) {
return hostFile;
}

public void createServersAndThreads(final ThreadProvider threadProvider) throws Exception {
public void createServersAndThreads(final ThreadProvider threadProvider) {
this.threadProvider = threadProvider;
Objects.requireNonNull(appContext.getClusterDetailsEventProcessor(),
"ClusterDetailsEventProcessor cannot be null in the AppContext");

rcaEnabledFile = Paths.get(hostDir.getAbsolutePath(), RcaController.RCA_ENABLED_CONF_FILE);
RcaSchedulerState state = rcaEnabled ? RcaSchedulerState.STATE_STARTED : RcaSchedulerState.STATE_STOPPED;
setRcaState(state);
setExpectedRcaState(state);

this.connectionManager = new GRPCConnectionManager(useHttps);
this.clientServers = PerformanceAnalyzerApp.createClientServers(connectionManager,
Expand Down Expand Up @@ -179,7 +180,7 @@ public void createServersAndThreads(final ThreadProvider threadProvider) throws
}

// We create a temporary file and then swap it for the rca.enabled file.
public void setRcaState(RcaSchedulerState rcaState) {
public void setExpectedRcaState(RcaSchedulerState rcaState) {
Path rcaEnabledTmp = Paths.get(rcaEnabledFile + ".tmp");
try (FileWriter f2 = new FileWriter(rcaEnabledTmp.toFile(), false /*To create a new file*/)) {
boolean value = true;
Expand Down Expand Up @@ -282,19 +283,29 @@ public void deleteHostDir() throws IOException {
}

public void stopRcaScheduler() throws Exception {
setRcaState(RcaSchedulerState.STATE_STOPPED);
rcaController.waitForRcaState(RcaSchedulerState.STATE_STOPPED);
LOG.info("RCA Scheduler STOPPED");
RCAScheduler sched = rcaController.getRcaScheduler();
CountDownLatch shutdownLatch = null;
if (sched != null) {
shutdownLatch = new CountDownLatch(1);
sched.setSchedulerTrackingLatch(shutdownLatch);
}
setExpectedRcaState(RcaSchedulerState.STATE_STOPPED);
if (shutdownLatch != null) {
shutdownLatch.await(10, TimeUnit.SECONDS);
}
LOG.info("RCA Scheduler is STOPPED by TestRunner on node: {}", myTag);
}

public void startRcaControllerThread() {
this.rcaControllerThread = PerformanceAnalyzerApp.startRcaTopLevelThread(rcaController, threadProvider);
this.rcaControllerThread = PerformanceAnalyzerApp.startRcaTopLevelThread(
rcaController,
threadProvider,
appContext.getMyInstanceDetails().getInstanceId().toString());
}

public void startRcaScheduler() throws Exception {
setRcaState(RcaSchedulerState.STATE_STARTED);
setExpectedRcaState(RcaSchedulerState.STATE_STARTED);
rcaController.waitForRcaState(RcaSchedulerState.STATE_STARTED);
LOG.info("RCA scheduler STARTED successfully on host: {}.", myTag);
}

public void updateRcaGraph(final Class rcaGraphClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
)
}
)
public class RcaItPoc {
public class RcaItPocCoLocatedMaster {
private TestApi api;

@Test
Expand Down
Loading

0 comments on commit d5f98c1

Please sign in to comment.