Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
some refactoring in persistance layer for better error messages and e…
Browse files Browse the repository at this point in the history
…rror catching

Added a test that does concurrent writes to the DB
  • Loading branch information
yojs committed Jul 30, 2020
1 parent b603245 commit 02726fa
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected List<File> countBasedCleanup(List<File> files) throws IOException {
private void delete(File file) throws IOException {
Path path = Paths.get(file.toURI());
try {
Files.delete(path);
Files.deleteIfExists(path);
} catch (IOException e) {
LOG.error("Could not delete file: {}. Error: {}", file, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.text.DateFormat;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class FileRotate {
* @return null if the file was not rotated because it is not old enough, or the name of the file
* after rotation.
*/
Path tryRotate(long currentTimeMillis) throws IOException {
synchronized Path tryRotate(long currentTimeMillis) throws IOException {
if (shouldRotate(currentTimeMillis)) {
return rotate(currentTimeMillis);
}
Expand All @@ -72,7 +73,7 @@ Path tryRotate(long currentTimeMillis) throws IOException {
*
* @return Path to the file after it is rotated or null if it ran into a problem trying to do so.
*/
Path forceRotate(long currentTimeMillis) throws IOException {
synchronized Path forceRotate(long currentTimeMillis) throws IOException {
return rotate(currentTimeMillis);
}

Expand All @@ -98,7 +99,7 @@ protected boolean shouldRotate(long currentTimeMillis) {
*
* @return Returns the path to the file after it was rotated.
*/
protected Path rotate(long currentMillis) throws IOException {
protected synchronized Path rotate(long currentMillis) throws IOException {
if (!FILE_TO_ROTATE.toFile().exists()) {
return null;
}
Expand All @@ -115,7 +116,14 @@ protected Path rotate(long currentMillis) throws IOException {
Files.move(FILE_TO_ROTATE, targetFilePath);
lastRotatedMillis = System.currentTimeMillis();
} catch (FileAlreadyExistsException fae) {
LOG.error(fae);
if (!Files.deleteIfExists(targetFilePath)) {
LOG.error("Could not delete file: " + targetFilePath);
}
try {
Files.move(FILE_TO_ROTATE, targetFilePath, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception ex) {
LOG.error(ex);
}
} catch (IOException e) {
LOG.error(
"Could not RENAME file '{}' to '{}'. Error: {}", FILENAME, targetFileName, e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ abstract void createTable(

// Not required for now.
@Override
public List<ResourceFlowUnit> read(Node<?> node) {
public synchronized List<ResourceFlowUnit> read(Node<?> node) {
return null;
}

Expand All @@ -139,7 +139,7 @@ public synchronized JsonElement read(String rca) {
return rcaJson;
}

public synchronized void openNewDBFile() throws SQLException {
private synchronized void openNewDBFile() throws SQLException {
this.fileCreateTime = new Date(System.currentTimeMillis());
this.filename = Paths.get(dir, filenameParam).toString();
this.tableNames = new HashSet<>();
Expand Down Expand Up @@ -182,7 +182,7 @@ public synchronized <T extends ResourceFlowUnit> void write(Node<?> node, T flow
}
}

private void rotateRegisterGarbageThenCreateNewDB(RotationType type) throws IOException, SQLException {
private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type) throws IOException, SQLException {
Path rotatedFile = null;
switch (type) {
case FORCE_ROTATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,16 @@ synchronized void createTable(String tableName, List<Field<?>> columns) {
.column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX))
.columns(columns);

LOG.debug("table created: {}", constraintStep.toString());
constraintStep.execute();
try {
constraintStep.execute();
LOG.debug("table created: {}", constraintStep.toString());
} catch (DataAccessException ex) {
String msg = "table " + tableName + " already exists";
if (!ex.getMessage().contains(msg)) {
LOG.error(ex);
throw ex;
}
}
jooqTableColumns.put(tableName, columns);
}

Expand All @@ -98,37 +106,40 @@ synchronized void createTable(String tableName, List<Field<?>> columns, String r
String referenceTablePrimaryKeyFieldName) throws SQLException {
Field foreignKeyField = DSL.field(referenceTablePrimaryKeyFieldName, Integer.class);
columns.add(foreignKeyField);
Table referenceTable = DSL.table(referenceTableName);
CreateTableConstraintStep constraintStep = create.createTable(tableName)
.column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX))
.columns(columns)
.constraints(DSL.constraint(foreignKeyField.getName() + "_FK").foreignKey(foreignKeyField)
.references(referenceTable, DSL.field(referenceTablePrimaryKeyFieldName)));

LOG.debug("table with fk created: {}", constraintStep.toString());
try {
Table referenceTable = DSL.table(referenceTableName);
CreateTableConstraintStep constraintStep = create.createTable(tableName)
.column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX))
.columns(columns)
.constraints(DSL.constraint(foreignKeyField.getName() + "_FK").foreignKey(foreignKeyField)
.references(referenceTable, DSL.field(referenceTablePrimaryKeyFieldName)));
constraintStep.execute();
LOG.debug("table with fk created: {}", constraintStep.toString());
jooqTableColumns.put(tableName, columns);
} catch (Exception e) {
LOG.error("Failed to create table {}", tableName);
throw new SQLException();
} catch (DataAccessException e) {
String msg = "table " + tableName + " already exists";
if (!e.getMessage().contains(msg)) {
LOG.error(e);
throw new SQLException(e);
}
}
}

@Override
synchronized int insertRow(String tableName, List<Object> row) throws SQLException {
int lastPrimaryKey = -1;
String sqlQuery = "SELECT " + LAST_INSERT_ROWID;
InsertValuesStepN insertValuesStepN = create.insertInto(DSL.table(tableName))
.columns(jooqTableColumns.get(tableName))
.values(row);
LOG.debug("sql insert: {}", insertValuesStepN.toString());
try {
InsertValuesStepN insertValuesStepN = create.insertInto(DSL.table(tableName))
.columns(jooqTableColumns.get(tableName))
.values(row);
insertValuesStepN.execute();
LOG.debug("sql insert: {}", insertValuesStepN.toString());
lastPrimaryKey = create.fetch(sqlQuery).get(0).get(LAST_INSERT_ROWID, Integer.class);
} catch (Exception e) {
LOG.error("Failed to insert into the table {}", tableName);
throw new SQLException();
LOG.error("Failed to insert into the table {}", tableName, e);
throw new SQLException(e);
}
LOG.debug("most recently inserted primary key = {}", lastPrimaryKey);
return lastPrimaryKey;
Expand Down Expand Up @@ -227,7 +238,7 @@ private JsonElement constructFullTemperatureProfile() {
return rcaResponseJson;
}

private void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimaryKey) {
private synchronized void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimaryKey) {
String upperLevelTable = upperLevelSummary.getTableName();

// stop the recursion here if the summary does not have any nested summary table.
Expand Down Expand Up @@ -260,7 +271,7 @@ private void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimary
}
}

private JsonElement getTemperatureRca(String rca) {
private synchronized JsonElement getTemperatureRca(String rca) {
JsonElement temperatureRcaJson;
switch (rca) {
case SQLiteQueryUtils.ALL_TEMPERATURE_DIMENSIONS:
Expand All @@ -272,7 +283,7 @@ private JsonElement getTemperatureRca(String rca) {
return temperatureRcaJson;
}

private JsonElement getNonTemperatureRcas(String rca) {
private synchronized JsonElement getNonTemperatureRcas(String rca) {
RcaResponse response = null;
Field<Integer> primaryKeyField = DSL.field(
SQLiteQueryUtils.getPrimaryKeyColumnName(ResourceFlowUnit.RCA_TABLE_NAME), Integer.class);
Expand Down Expand Up @@ -310,7 +321,7 @@ public synchronized JsonElement readRca(String rca) {
return json;
}

private JsonElement readTemperatureProfileRca(String rca) {
private synchronized JsonElement readTemperatureProfileRca(String rca) {
RcaResponse response = null;
Field<Integer> primaryKeyField = DSL.field(
SQLiteQueryUtils.getPrimaryKeyColumnName(ResourceFlowUnit.RCA_TABLE_NAME), Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
Expand Down Expand Up @@ -80,7 +82,8 @@ public ResourceFlowUnit operate() {
}

@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {}
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
}
}

@Test
Expand All @@ -104,7 +107,7 @@ public void write() throws IOException, SQLException, InterruptedException {
// The first write, this should create only one file as there is nothing to rotate.
sqlite.write(rca, rfu);
Assert.assertEquals(1,
testLocation.toFile().list(new WildcardFileFilter(baseFilename + "*")).length);
testLocation.toFile().list(new WildcardFileFilter(baseFilename + "*")).length);
Assert.assertTrue(Paths.get(testLocation.toString(), baseFilename).toFile().exists());
Thread.sleep(1000);

Expand All @@ -126,4 +129,47 @@ public void write() throws IOException, SQLException, InterruptedException {
Assert.assertTrue(readTableStr.contains("TestRca"));
Assert.assertTrue(readTableStr.contains("HotResourceSummary"));
}

@Test
public void concurrentWriteAndRotate() throws IOException, SQLException {
ResourceContext context = new ResourceContext(Resources.State.UNHEALTHY);
HotResourceSummary summary =
new HotResourceSummary(
ResourceUtil.OLD_GEN_HEAP_USAGE,
70,
71,
60);
ResourceFlowUnit rfu = new ResourceFlowUnit(System.currentTimeMillis(), context, summary, true);
Node rca = new TestRca();
SQLitePersistor sqlite =
new SQLitePersistor(
testLocation.toString(), baseFilename, String.valueOf(1), TimeUnit.MICROSECONDS, 0);


int numThreads = 100;
int numWrites = 50;

List<Thread> threads = new ArrayList<>();

for (int i = 0; i < numThreads; i++) {
threads.add(new Thread(() -> {
for (int j = 0; j < numWrites; j++) {
try {
sqlite.write(rca, rfu);
} catch (SQLException | IOException throwables) {
throwables.printStackTrace();
}
}
}));
}

for (Thread th : threads) {
th.start();
try {
th.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Binary file removed src/test/resources/tmp/file_rotate/rca.test.file
Binary file not shown.
Binary file not shown.

0 comments on commit 02726fa

Please sign in to comment.