Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Persistance concurrency bug #323

Merged
merged 6 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
/bin/
/coverage-error.log
gradle.properties
/src/test/resources/tmp/file_gc/
/src/test/resources/tmp/file_rotate/
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected List<File> countBasedCleanup(List<File> files) throws IOException {
private void delete(File file) throws IOException {
Path path = Paths.get(file.toURI());
try {
Files.delete(path);
Files.deleteIfExists(path);
} catch (IOException e) {
LOG.error("Could not delete file: {}. Error: {}", file, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.text.DateFormat;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class FileRotate {
* @return null if the file was not rotated because it is not old enough, or the name of the file
* after rotation.
*/
Path tryRotate(long currentTimeMillis) throws IOException {
synchronized Path tryRotate(long currentTimeMillis) throws IOException {
if (shouldRotate(currentTimeMillis)) {
return rotate(currentTimeMillis);
}
Expand All @@ -72,7 +73,7 @@ Path tryRotate(long currentTimeMillis) throws IOException {
*
* @return Path to the file after it is rotated or null if it ran into a problem trying to do so.
*/
Path forceRotate(long currentTimeMillis) throws IOException {
synchronized Path forceRotate(long currentTimeMillis) throws IOException {
return rotate(currentTimeMillis);
}

Expand All @@ -89,16 +90,29 @@ protected boolean shouldRotate(long currentTimeMillis) {
return timeUnitsPassed >= ROTATION_PERIOD;
}

private void tryDelete(Path file) throws IOException {
try {
if (!Files.deleteIfExists(file)) {
LOG.warn("The file to delete didn't exist: {}", file);
}
} catch (IOException ioException) {
LOG.error(ioException);
throw ioException;
}
}

/**
* Rotate the file.
*
* <p>Rotating a file renames it to filename.#current time in the date format specified#. For
* cases the file could not be renamed, we attempt to delete the file. If the file could not be
* deleted either, we throw an IOException for the caller to handle or take the necessary action.
*
* @return Returns the path to the file after it was rotated.
* @return Returns the path to the file after it was rotated, this is so that the GC can add it to the list.
*
* @throws IOException when it can't even delete the current file.
*/
protected Path rotate(long currentMillis) throws IOException {
protected synchronized Path rotate(long currentMillis) throws IOException {
if (!FILE_TO_ROTATE.toFile().exists()) {
return null;
}
Expand All @@ -111,23 +125,47 @@ protected Path rotate(long currentMillis) throws IOException {
targetFileName.append(FILE_PART_SEPARATOR).append(ROTATED_FILE_FORMAT.format(currentMillis));

Path targetFilePath = Paths.get(dir, targetFileName.toString());

Path ret;

// Fallback in rotating a file:
// try 1. Rotate the file, don't try to replace the destination file if one exists.
// try 2: Rotate the file now with replacement and add a log saying the destination file will be deleted.
// try 3: Delete the file, don't rotate. The caller will create a new file and start over.
// try 4: If the delete fails, all bets are off, throw an exception and let the caller decide.
try {
Files.move(FILE_TO_ROTATE, targetFilePath);
lastRotatedMillis = System.currentTimeMillis();
ret = Files.move(FILE_TO_ROTATE, targetFilePath, StandardCopyOption.ATOMIC_MOVE);
} catch (FileAlreadyExistsException fae) {
LOG.error(fae);
} catch (IOException e) {
LOG.error(
"Could not RENAME file '{}' to '{}'. Error: {}", FILENAME, targetFileName, e.getCause());
LOG.error("Deleting file '{}' or else we cannot rotate the current {}", targetFilePath, FILE_TO_ROTATE);
if (!Files.deleteIfExists(targetFilePath)) {
LOG.error("Could not delete file: " + targetFilePath);
}
try {
LOG.info("Attempting to delete file '{}'", FILENAME);
Files.deleteIfExists(FILE_TO_ROTATE);
} catch (IOException ex) {
LOG.error("Could not DELETE file '{}'. Error: {}", FILENAME, ex.getCause());
throw ex;
ret = Files.move(FILE_TO_ROTATE, targetFilePath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
} catch (Exception ex) {
LOG.error(ex);
LOG.error("Deleting file: {}", FILE_TO_ROTATE);
tryDelete(FILE_TO_ROTATE);

// Because we are deleting the current file, there is nothing for the GC to add.
ret = null;
}
return null;
} catch (IOException e) {
LOG.error("Could not RENAME file '{}' to '{}'. Error: {}", FILE_TO_ROTATE, targetFilePath, e);
tryDelete(FILE_TO_ROTATE);

// Because we are deleting the current file, there is nothing for the GC to add.
ret = null;
}
return targetFilePath;

// If we are here then we have successfully rotated or deleted the FILE_TO_ROTATE.
// In both the cases a new file will be created by the caller and that file should exist
// for the ROTATION_PERIOD.
lastRotatedMillis = currentMillis;
return ret;
}

public long getLastRotatedMillis() {
return lastRotatedMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public synchronized void close() throws SQLException {
}
}

abstract void createTable(String tableName, List<Field<?>> columns);
abstract void createTable(String tableName, List<Field<?>> columns) throws SQLException;

abstract void createTable(
String tableName,
Expand All @@ -115,7 +115,7 @@ abstract void createTable(

// Not required for now.
@Override
public List<ResourceFlowUnit> read(Node<?> node) {
public synchronized List<ResourceFlowUnit> read(Node<?> node) {
return null;
}

Expand All @@ -139,7 +139,7 @@ public synchronized JsonElement read(String rca) {
return rcaJson;
}

public synchronized void openNewDBFile() throws SQLException {
private synchronized void openNewDBFile() throws SQLException {
this.fileCreateTime = new Date(System.currentTimeMillis());
this.filename = Paths.get(dir, filenameParam).toString();
this.tableNames = new HashSet<>();
Expand Down Expand Up @@ -182,18 +182,24 @@ public synchronized <T extends ResourceFlowUnit> void write(Node<?> node, T flow
}
}

private void rotateRegisterGarbageThenCreateNewDB(RotationType type) throws IOException, SQLException {
private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type) throws IOException, SQLException {
Path rotatedFile = null;
long currTime = System.currentTimeMillis();
switch (type) {
case FORCE_ROTATE:
rotatedFile = fileRotate.forceRotate(System.currentTimeMillis());
rotatedFile = fileRotate.forceRotate(currTime);
break;
case TRY_ROTATE:
rotatedFile = fileRotate.tryRotate(System.currentTimeMillis());
rotatedFile = fileRotate.tryRotate(currTime);
break;
}
if (rotatedFile != null) {
fileGC.eligibleForGc(rotatedFile.toFile().getName());
}

// If we are here that means the tryRotate or the forceRotate didn't throw exception and therefore,
// the current DBFile does not exist anymore. We therefore should create a new one.
if (fileRotate.getLastRotatedMillis() == currTime) {
openNewDBFile();
}
}
Expand Down Expand Up @@ -224,12 +230,8 @@ private <T extends ResourceFlowUnit> void tryWriteFlowUnit(
T flowUnit, String nodeName) throws SQLException, DataAccessException {
String tableName = ResourceFlowUnit.RCA_TABLE_NAME;
if (!tableNames.contains(tableName)) {
LOG.info(
"RCA: Table '{}' does not exist. Creating one with columns: {}",
tableName,
flowUnit.getSqlSchema());
LOG.info("RCA: Table '{}' does not exist. Creating one with columns: {}", tableName, flowUnit.getSqlSchema());
createTable(tableName, flowUnit.getSqlSchema());
tableNames.add(tableName);
}
int lastPrimaryKey = insertRow(tableName, flowUnit.getSqlValue(nodeName));

Expand All @@ -250,13 +252,8 @@ private void writeSummary(
int referenceTablePrimaryKeyFieldValue) throws SQLException {
String tableName = summary.getClass().getSimpleName();
if (!tableNames.contains(tableName)) {
LOG.info(
"RCA: Table '{}' does not exist. Creating one with columns: {}",
tableName,
summary.getSqlSchema());
createTable(
tableName, summary.getSqlSchema(), referenceTable, referenceTablePrimaryKeyFieldName);
tableNames.add(tableName);
LOG.info("RCA: Table '{}' does not exist. Creating one with columns: {}", tableName, summary.getSqlSchema());
createTable(tableName, summary.getSqlSchema(), referenceTable, referenceTablePrimaryKeyFieldName);
}
List<Object> values = summary.getSqlValue();
values.add(Integer.valueOf(referenceTablePrimaryKeyFieldValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -79,15 +80,27 @@ synchronized void createNewDSLContext() {
}

@Override
synchronized void createTable(String tableName, List<Field<?>> columns) {
synchronized void createTable(String tableName, List<Field<?>> columns) throws SQLException {
CreateTableConstraintStep constraintStep = create.createTable(tableName)
//sqlite does not support identity. use plain sql string instead.
.column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX))
.columns(columns);

LOG.debug("table created: {}", constraintStep.toString());
constraintStep.execute();
try {
constraintStep.execute();
LOG.debug("Successfully created table: {}", tableName);
} catch (DataAccessException ex) {
String msg = "table " + tableName + " already exists";
if (ex.getMessage().contains(msg)) {
LOG.debug(ex.getMessage());
} else {
LOG.error(ex);
throw new SQLException(ex);
}
}
tableNames.add(tableName);
jooqTableColumns.put(tableName, columns);
LOG.debug("Added table '{}' and its columns: '{}' to in-memory registry.", tableName, columns);
}

/**
Expand All @@ -98,37 +111,58 @@ synchronized void createTable(String tableName, List<Field<?>> columns, String r
String referenceTablePrimaryKeyFieldName) throws SQLException {
Field foreignKeyField = DSL.field(referenceTablePrimaryKeyFieldName, Integer.class);
columns.add(foreignKeyField);
Table referenceTable = DSL.table(referenceTableName);
CreateTableConstraintStep constraintStep = create.createTable(tableName)
.column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX))
.columns(columns)
.constraints(DSL.constraint(foreignKeyField.getName() + "_FK").foreignKey(foreignKeyField)
.references(referenceTable, DSL.field(referenceTablePrimaryKeyFieldName)));

LOG.debug("table with fk created: {}", constraintStep.toString());
try {
LOG.debug("Trying to create a summary table: {} that references {}", tableName, referenceTableName);
Table referenceTable = DSL.table(referenceTableName);
CreateTableConstraintStep constraintStep = create.createTable(tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume creating the SQL statement itself does not throw any exception. Can we leave this part out of the try block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DSLContext can be uninitialized if we are not using synchronization between the threads. I saw a case where the table was added to hastable but the columns for the table were missing. We gather some date to create the createStatement and it the gathering part we can run into exceptions.
These cases are fixed now but it would be good to catch and log exception. But you are right, the scope of exceptions is larger and so I have added code to catch exception and not just DataAcessException.

.column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX))
.columns(columns)
.constraints(DSL.constraint(foreignKeyField.getName() + "_FK").foreignKey(foreignKeyField)
.references(referenceTable, DSL.field(referenceTablePrimaryKeyFieldName)));
constraintStep.execute();
jooqTableColumns.put(tableName, columns);
} catch (Exception e) {
LOG.error("Failed to create table {}", tableName);
throw new SQLException();
LOG.debug("table with fk created: {}", constraintStep.toString());
} catch (DataAccessException e) {
String msg = "table " + tableName + " already exists";
if (e.getMessage().contains(msg)) {
LOG.debug(e.getMessage());
} else {
LOG.error("Error creating table: {}", tableName, e);
throw new SQLException(e);
}
} catch (Exception ex) {
LOG.error(ex);
throw new SQLException(ex);
}
tableNames.add(tableName);
jooqTableColumns.put(tableName, columns);
}

@Override
synchronized int insertRow(String tableName, List<Object> row) throws SQLException {
int lastPrimaryKey = -1;
String sqlQuery = "SELECT " + LAST_INSERT_ROWID;
InsertValuesStepN insertValuesStepN = create.insertInto(DSL.table(tableName))
.columns(jooqTableColumns.get(tableName))

Objects.requireNonNull(create, "DSLContext cannot be null");
Table<Record> table = DSL.table(tableName);
List<Field<?>> columnsForTable = jooqTableColumns.get(tableName);
if (columnsForTable == null) {
LOG.error("NO columns found for table: {}. Tables: {}, columns: {}", tableName, tableNames, jooqTableColumns);
throw new SQLException("No columns exist for table.");
}

InsertValuesStepN insertValuesStepN = create
.insertInto(table)
.columns(columnsForTable)
.values(row);
LOG.debug("sql insert: {}", insertValuesStepN.toString());

try {
insertValuesStepN.execute();
LOG.debug("sql insert: {}", insertValuesStepN.toString());
lastPrimaryKey = create.fetch(sqlQuery).get(0).get(LAST_INSERT_ROWID, Integer.class);
} catch (Exception e) {
LOG.error("Failed to insert into the table {}", tableName);
throw new SQLException();
LOG.error("Failed to insert into the table {}", tableName, e);
throw new SQLException(e);
}
LOG.debug("most recently inserted primary key = {}", lastPrimaryKey);
return lastPrimaryKey;
Expand Down Expand Up @@ -227,7 +261,7 @@ private JsonElement constructFullTemperatureProfile() {
return rcaResponseJson;
}

private void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimaryKey) {
private synchronized void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimaryKey) {
String upperLevelTable = upperLevelSummary.getTableName();

// stop the recursion here if the summary does not have any nested summary table.
Expand Down Expand Up @@ -260,7 +294,7 @@ private void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimary
}
}

private JsonElement getTemperatureRca(String rca) {
private synchronized JsonElement getTemperatureRca(String rca) {
JsonElement temperatureRcaJson;
switch (rca) {
case SQLiteQueryUtils.ALL_TEMPERATURE_DIMENSIONS:
Expand All @@ -272,7 +306,7 @@ private JsonElement getTemperatureRca(String rca) {
return temperatureRcaJson;
}

private JsonElement getNonTemperatureRcas(String rca) {
private synchronized JsonElement getNonTemperatureRcas(String rca) {
RcaResponse response = null;
Field<Integer> primaryKeyField = DSL.field(
SQLiteQueryUtils.getPrimaryKeyColumnName(ResourceFlowUnit.RCA_TABLE_NAME), Integer.class);
Expand Down Expand Up @@ -310,7 +344,7 @@ public synchronized JsonElement readRca(String rca) {
return json;
}

private JsonElement readTemperatureProfileRca(String rca) {
private synchronized JsonElement readTemperatureProfileRca(String rca) {
RcaResponse response = null;
Field<Integer> primaryKeyField = DSL.field(
SQLiteQueryUtils.getPrimaryKeyColumnName(ResourceFlowUnit.RCA_TABLE_NAME), Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public void rotate() throws IOException {
Files.createFile(fileToRotate);
Assert.assertTrue(fileToRotate.toFile().exists());
fileRotate.rotate(currentMillis);
Assert.assertEquals("File should not rotate if the rotation target already exists",
lastRotatedMillis, fileRotate.lastRotatedMillis);
Assert.assertTrue("File should not rotate if the rotation target already exists",
currentMillis == fileRotate.lastRotatedMillis);
}

@Test
Expand Down
Loading