From 02726fa42899d47e867bc804daff85a38f9db7be Mon Sep 17 00:00:00 2001 From: Joydeep Sinha Date: Thu, 30 Jul 2020 11:27:55 -0700 Subject: [PATCH] some refactoring in persistance layer for better error messages and error catching Added a test that does concurrent writes to the DB --- .../rca/persistence/FileGC.java | 2 +- .../rca/persistence/FileRotate.java | 16 +++-- .../rca/persistence/PersistorBase.java | 6 +- .../rca/persistence/SQLitePersistor.java | 55 +++++++++++------- .../rca/persistence/SQLitePersistorTest.java | 50 +++++++++++++++- .../resources/tmp/file_rotate/rca.test.file | Bin 4096 -> 0 bytes .../rca.test.file.2020-07-20-11-21-23 | Bin 4096 -> 0 bytes 7 files changed, 97 insertions(+), 32 deletions(-) delete mode 100644 src/test/resources/tmp/file_rotate/rca.test.file delete mode 100644 src/test/resources/tmp/file_rotate/rca.test.file.2020-07-20-11-21-23 diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileGC.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileGC.java index bc94b0b9f..4dcd871b1 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileGC.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileGC.java @@ -144,7 +144,7 @@ protected List countBasedCleanup(List files) throws IOException { private void delete(File file) throws IOException { Path path = Paths.get(file.toURI()); try { - Files.delete(path); + Files.deleteIfExists(path); } catch (IOException e) { LOG.error("Could not delete file: {}. Error: {}", file, e); throw e; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java index 1ce0d6463..ac6bc99b6 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/FileRotate.java @@ -20,6 +20,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.text.DateFormat; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; @@ -57,7 +58,7 @@ public class FileRotate { * @return null if the file was not rotated because it is not old enough, or the name of the file * after rotation. */ - Path tryRotate(long currentTimeMillis) throws IOException { + synchronized Path tryRotate(long currentTimeMillis) throws IOException { if (shouldRotate(currentTimeMillis)) { return rotate(currentTimeMillis); } @@ -72,7 +73,7 @@ Path tryRotate(long currentTimeMillis) throws IOException { * * @return Path to the file after it is rotated or null if it ran into a problem trying to do so. */ - Path forceRotate(long currentTimeMillis) throws IOException { + synchronized Path forceRotate(long currentTimeMillis) throws IOException { return rotate(currentTimeMillis); } @@ -98,7 +99,7 @@ protected boolean shouldRotate(long currentTimeMillis) { * * @return Returns the path to the file after it was rotated. */ - protected Path rotate(long currentMillis) throws IOException { + protected synchronized Path rotate(long currentMillis) throws IOException { if (!FILE_TO_ROTATE.toFile().exists()) { return null; } @@ -115,7 +116,14 @@ protected Path rotate(long currentMillis) throws IOException { Files.move(FILE_TO_ROTATE, targetFilePath); lastRotatedMillis = System.currentTimeMillis(); } catch (FileAlreadyExistsException fae) { - LOG.error(fae); + if (!Files.deleteIfExists(targetFilePath)) { + LOG.error("Could not delete file: " + targetFilePath); + } + try { + Files.move(FILE_TO_ROTATE, targetFilePath, StandardCopyOption.REPLACE_EXISTING); + } catch (Exception ex) { + LOG.error(ex); + } } catch (IOException e) { LOG.error( "Could not RENAME file '{}' to '{}'. Error: {}", FILENAME, targetFileName, e.getCause()); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java index e953aa60d..5a2f8b916 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistorBase.java @@ -115,7 +115,7 @@ abstract void createTable( // Not required for now. @Override - public List read(Node node) { + public synchronized List read(Node node) { return null; } @@ -139,7 +139,7 @@ public synchronized JsonElement read(String rca) { return rcaJson; } - public synchronized void openNewDBFile() throws SQLException { + private synchronized void openNewDBFile() throws SQLException { this.fileCreateTime = new Date(System.currentTimeMillis()); this.filename = Paths.get(dir, filenameParam).toString(); this.tableNames = new HashSet<>(); @@ -182,7 +182,7 @@ public synchronized void write(Node node, T flow } } - private void rotateRegisterGarbageThenCreateNewDB(RotationType type) throws IOException, SQLException { + private synchronized void rotateRegisterGarbageThenCreateNewDB(RotationType type) throws IOException, SQLException { Path rotatedFile = null; switch (type) { case FORCE_ROTATE: diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java index 458bc847f..be2a00ca3 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistor.java @@ -85,8 +85,16 @@ synchronized void createTable(String tableName, List> columns) { .column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX)) .columns(columns); - LOG.debug("table created: {}", constraintStep.toString()); - constraintStep.execute(); + try { + constraintStep.execute(); + LOG.debug("table created: {}", constraintStep.toString()); + } catch (DataAccessException ex) { + String msg = "table " + tableName + " already exists"; + if (!ex.getMessage().contains(msg)) { + LOG.error(ex); + throw ex; + } + } jooqTableColumns.put(tableName, columns); } @@ -98,20 +106,23 @@ synchronized void createTable(String tableName, List> columns, String r String referenceTablePrimaryKeyFieldName) throws SQLException { Field foreignKeyField = DSL.field(referenceTablePrimaryKeyFieldName, Integer.class); columns.add(foreignKeyField); - Table referenceTable = DSL.table(referenceTableName); - CreateTableConstraintStep constraintStep = create.createTable(tableName) - .column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX)) - .columns(columns) - .constraints(DSL.constraint(foreignKeyField.getName() + "_FK").foreignKey(foreignKeyField) - .references(referenceTable, DSL.field(referenceTablePrimaryKeyFieldName))); - LOG.debug("table with fk created: {}", constraintStep.toString()); try { + Table referenceTable = DSL.table(referenceTableName); + CreateTableConstraintStep constraintStep = create.createTable(tableName) + .column(DSL.field(getPrimaryKeyColumnName(tableName) + PRIMARY_KEY_AUTOINCREMENT_POSTFIX)) + .columns(columns) + .constraints(DSL.constraint(foreignKeyField.getName() + "_FK").foreignKey(foreignKeyField) + .references(referenceTable, DSL.field(referenceTablePrimaryKeyFieldName))); constraintStep.execute(); + LOG.debug("table with fk created: {}", constraintStep.toString()); jooqTableColumns.put(tableName, columns); - } catch (Exception e) { - LOG.error("Failed to create table {}", tableName); - throw new SQLException(); + } catch (DataAccessException e) { + String msg = "table " + tableName + " already exists"; + if (!e.getMessage().contains(msg)) { + LOG.error(e); + throw new SQLException(e); + } } } @@ -119,16 +130,16 @@ synchronized void createTable(String tableName, List> columns, String r synchronized int insertRow(String tableName, List row) throws SQLException { int lastPrimaryKey = -1; String sqlQuery = "SELECT " + LAST_INSERT_ROWID; - InsertValuesStepN insertValuesStepN = create.insertInto(DSL.table(tableName)) - .columns(jooqTableColumns.get(tableName)) - .values(row); - LOG.debug("sql insert: {}", insertValuesStepN.toString()); try { + InsertValuesStepN insertValuesStepN = create.insertInto(DSL.table(tableName)) + .columns(jooqTableColumns.get(tableName)) + .values(row); insertValuesStepN.execute(); + LOG.debug("sql insert: {}", insertValuesStepN.toString()); lastPrimaryKey = create.fetch(sqlQuery).get(0).get(LAST_INSERT_ROWID, Integer.class); } catch (Exception e) { - LOG.error("Failed to insert into the table {}", tableName); - throw new SQLException(); + LOG.error("Failed to insert into the table {}", tableName, e); + throw new SQLException(e); } LOG.debug("most recently inserted primary key = {}", lastPrimaryKey); return lastPrimaryKey; @@ -227,7 +238,7 @@ private JsonElement constructFullTemperatureProfile() { return rcaResponseJson; } - private void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimaryKey) { + private synchronized void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimaryKey) { String upperLevelTable = upperLevelSummary.getTableName(); // stop the recursion here if the summary does not have any nested summary table. @@ -260,7 +271,7 @@ private void readSummary(GenericSummary upperLevelSummary, int upperLevelPrimary } } - private JsonElement getTemperatureRca(String rca) { + private synchronized JsonElement getTemperatureRca(String rca) { JsonElement temperatureRcaJson; switch (rca) { case SQLiteQueryUtils.ALL_TEMPERATURE_DIMENSIONS: @@ -272,7 +283,7 @@ private JsonElement getTemperatureRca(String rca) { return temperatureRcaJson; } - private JsonElement getNonTemperatureRcas(String rca) { + private synchronized JsonElement getNonTemperatureRcas(String rca) { RcaResponse response = null; Field primaryKeyField = DSL.field( SQLiteQueryUtils.getPrimaryKeyColumnName(ResourceFlowUnit.RCA_TABLE_NAME), Integer.class); @@ -310,7 +321,7 @@ public synchronized JsonElement readRca(String rca) { return json; } - private JsonElement readTemperatureProfileRca(String rca) { + private synchronized JsonElement readTemperatureProfileRca(String rca) { RcaResponse response = null; Field primaryKeyField = DSL.field( SQLiteQueryUtils.getPrimaryKeyColumnName(ResourceFlowUnit.RCA_TABLE_NAME), Integer.class); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java index 529e8da41..39594b057 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java @@ -29,6 +29,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.WildcardFileFilter; @@ -80,7 +82,8 @@ public ResourceFlowUnit operate() { } @Override - public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {} + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + } } @Test @@ -104,7 +107,7 @@ public void write() throws IOException, SQLException, InterruptedException { // The first write, this should create only one file as there is nothing to rotate. sqlite.write(rca, rfu); Assert.assertEquals(1, - testLocation.toFile().list(new WildcardFileFilter(baseFilename + "*")).length); + testLocation.toFile().list(new WildcardFileFilter(baseFilename + "*")).length); Assert.assertTrue(Paths.get(testLocation.toString(), baseFilename).toFile().exists()); Thread.sleep(1000); @@ -126,4 +129,47 @@ public void write() throws IOException, SQLException, InterruptedException { Assert.assertTrue(readTableStr.contains("TestRca")); Assert.assertTrue(readTableStr.contains("HotResourceSummary")); } + + @Test + public void concurrentWriteAndRotate() throws IOException, SQLException { + ResourceContext context = new ResourceContext(Resources.State.UNHEALTHY); + HotResourceSummary summary = + new HotResourceSummary( + ResourceUtil.OLD_GEN_HEAP_USAGE, + 70, + 71, + 60); + ResourceFlowUnit rfu = new ResourceFlowUnit(System.currentTimeMillis(), context, summary, true); + Node rca = new TestRca(); + SQLitePersistor sqlite = + new SQLitePersistor( + testLocation.toString(), baseFilename, String.valueOf(1), TimeUnit.MICROSECONDS, 0); + + + int numThreads = 100; + int numWrites = 50; + + List threads = new ArrayList<>(); + + for (int i = 0; i < numThreads; i++) { + threads.add(new Thread(() -> { + for (int j = 0; j < numWrites; j++) { + try { + sqlite.write(rca, rfu); + } catch (SQLException | IOException throwables) { + throwables.printStackTrace(); + } + } + })); + } + + for (Thread th : threads) { + th.start(); + try { + th.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } } diff --git a/src/test/resources/tmp/file_rotate/rca.test.file b/src/test/resources/tmp/file_rotate/rca.test.file deleted file mode 100644 index b2aa6b7be0f2d7dba01300b76782b74a78ae8b14..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmeHK&u`N(6t?5;M+FmyGWD#yWxFPY^ z@J}HmuDf#NIc=e7BqXlIPmb+;e$RgIdvW4iy*-Z=r{hctrsy-&Kp3N!lp+M(`2|fW z7StMB3SM^p(l*f39}o6Gfnbivm^_B&|C7%>veWMO@fV`l`-H#Fl*{w1kO99cgkW-3 zM;gbjV|xzu?4xss*2g<_$RE6-gQ4e~Ixf9*2N$+GqG!&C+V8wKgCWd!;S4>R7D-Z8 zJt`MY_-Z!cG)~nzF1V6$uri_|u!*uHq+wP-BnyKnOA20v*z~#zh&WvX*rzI>V!{0h zmvI*QIS;Zl%vY8MUzi^<#psmDAYyW{wtH;r!2fs;hPjeV1M@Ne;JMcPjdw4t?e^gz9+k<;ZxYxb;LuG`s#b4FB~5F(os#yWxFPY^ z@J}HmuDf#NIc=e7BqXlIPmb+;e$RgIdvW4iy*-Z=r{hctrsy-&Kp3N!lp+M(`2|fW z7StMB3SM^p(l*f39}o6Gfnbivm^_B&|C7%>veWMO@fV`l`-H#Fl*{w1kO99cgkW-3 zM;gbjV|xzu?4xss*2g<_$RE6-gQ4e~Ixf9*2N$+GqG!&C+V8wKgCWd!;S4>R7D-Z8 zJt`MY_-Z!cG)~nzF1V6$uri_|u!*uHq+wP-BnyKnOA20v*z~#zh&WvX*rzI>V!{0h zmvI*QIS;Zl%vY8MUzi^<#psmDAYyW{wtH;r!2fs;hPjeV1M@Ne;JMcPjdw4t?e^gz9+k<;ZxYxb;LuG`s#b4FB~5F(os