From 4f3232c73ea697f3a81b15bcfc66722828f325a0 Mon Sep 17 00:00:00 2001 From: Michael Stewart Date: Thu, 4 Apr 2024 18:29:12 +0100 Subject: [PATCH 1/2] IKASAN-2366 adding the ability to configure entities to be house kept once they have been harvested without waiting for the entity expiry. --- .../ErrorReportingAutoConfiguration.java | 10 +- .../dao/HibernateErrorManagementDao.java | 24 ++- .../ErrorReportingTestAutoConfiguration.java | 7 + .../dao/HibernateErrorManagementDaoTest.java | 50 +++++- ikasaneip/housekeeping/Readme.md | 5 + .../replay/ReplayAutoConfiguration.java | 13 +- .../ikasan/replay/dao/HibernateReplayDao.java | 19 +- .../replay/ReplayTestAutoConfiguration.java | 17 ++ .../replay/service/ReplayServiceTest.java | 155 +++++++++++----- .../spec/systemevent/SystemEventDao.java | 2 - .../SystemEventAutoConfiguration.java | 11 +- .../dao/HibernateSystemEventDao.java | 38 ++-- .../SystemEventTestAutoConfiguration.java | 10 ++ .../dao/HibernateSystemEventDaoTest.java | 69 ++++++++ .../org/ikasan/WiretapAutoConfiguration.java | 10 +- .../wiretap/dao/HibernateWiretapDao.java | 166 +++++++++++++++--- .../ikasan/WiretapTestAutoConfiguration.java | 7 + .../wiretap/dao/HibernateWiretapDaoTest.java | 42 ++++- .../src/test/resources/logback-test.xml | 2 +- 19 files changed, 538 insertions(+), 119 deletions(-) diff --git a/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/ErrorReportingAutoConfiguration.java b/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/ErrorReportingAutoConfiguration.java index 5dfe11e632..78b1133275 100644 --- a/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/ErrorReportingAutoConfiguration.java +++ b/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/ErrorReportingAutoConfiguration.java @@ -12,6 +12,7 @@ import org.ikasan.spec.housekeeping.HousekeepService; import org.ikasan.spec.serialiser.SerialiserFactory; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,6 +29,9 @@ @ImportResource("/error-reporting-transaction.xml") public class ErrorReportingAutoConfiguration { + @Value("${errorReportingHousekeepingJob-deleteOnceHarvested:false}") + private boolean deleteOnceHarvested; + @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public ErrorReportingService errorReportingService(ErrorReportingServiceFactory errorReportingServiceFactory) { @@ -42,7 +46,7 @@ public ErrorReportingServiceFactory errorReportingServiceFactory(SerialiserFacto @Bean(name = "errorReportingManagementService") public ErrorReportingManagementService errorReportingManagementService(ErrorReportingServiceDao errorReportingServiceDao - , ErrorManagementDao errorManagementDao) { + , @Qualifier("errorManagementDao") ErrorManagementDao errorManagementDao) { return new ErrorReportingManagementServiceImpl(errorManagementDao, errorReportingServiceDao); } @@ -51,9 +55,9 @@ public ErrorReportingServiceDao errorReportingServiceDao() { return new HibernateErrorReportingServiceDao(); } - @Bean + @Bean(name = "errorManagementDao") public ErrorManagementDao errorManagementDao() { - return new HibernateErrorManagementDao(); + return new HibernateErrorManagementDao(this.deleteOnceHarvested); } @Bean diff --git a/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDao.java b/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDao.java index 91947f380b..5e2e46f03a 100644 --- a/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDao.java +++ b/ikasaneip/error-reporting/src/main/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDao.java @@ -73,6 +73,9 @@ public class HibernateErrorManagementDao implements ErrorManagementDao public static final String ERROR_OCCURRENCES_TO_DELETE_QUERY = "select uri from ErrorOccurrenceImpl eo " + " where eo.expiry < :" + NOW; + public static final String HARVESTED_ERROR_OCCURRENCES_TO_DELETE_QUERY = "select uri from ErrorOccurrenceImpl eo " + + " where eo.harvested = true order by eo.timestamp"; + public static final String ERROR_OCCURRENCE_DELETE_QUERY = "delete ErrorOccurrenceImpl eo " + " where eo.uri in(:" + EVENT_IDS + ")"; @@ -84,7 +87,21 @@ public class HibernateErrorManagementDao implements ErrorManagementDao @PersistenceContext(unitName = "error-reporting") private EntityManager entityManager; - @Override + private boolean deleteOnceHarvested; + + /** + * HibernateErrorManagementDao is a class that represents a data access object for managing errors in Hibernate. + * It provides methods for saving, deleting, and finding error occurrences, as well as housekeeping + * and updating harvested records. + * + * @param deleteOnceHarvested a boolean flag indicating whether harvested error occurrences should be deleted + * after being processed + */ + public HibernateErrorManagementDao(boolean deleteOnceHarvested) { + this.deleteOnceHarvested = deleteOnceHarvested; + } + + @Override public void saveErrorOccurrence(ErrorOccurrence errorOccurrence) { this.entityManager.persist(this.entityManager.contains(errorOccurrence) ? errorOccurrence : entityManager.merge(errorOccurrence)); @@ -243,8 +260,9 @@ public Long getNumberOfModuleErrors(String moduleName, boolean excluded, boolean @Override public void housekeep(final Integer numToHousekeep) { - Query query = this.entityManager.createQuery(ERROR_OCCURRENCES_TO_DELETE_QUERY); - query.setParameter(NOW, System.currentTimeMillis()); + Query query = this.entityManager.createQuery(this.deleteOnceHarvested + ? HARVESTED_ERROR_OCCURRENCES_TO_DELETE_QUERY : ERROR_OCCURRENCES_TO_DELETE_QUERY); + if(!this.deleteOnceHarvested)query.setParameter(NOW, System.currentTimeMillis()); query.setMaxResults(numToHousekeep); List errorUris = (List)query.getResultList(); diff --git a/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/ErrorReportingTestAutoConfiguration.java b/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/ErrorReportingTestAutoConfiguration.java index 38cb28d841..3ce13f2ca1 100644 --- a/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/ErrorReportingTestAutoConfiguration.java +++ b/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/ErrorReportingTestAutoConfiguration.java @@ -3,6 +3,8 @@ import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple; import com.arjuna.ats.jta.UserTransaction; import jakarta.persistence.EntityManagerFactory; +import org.ikasan.error.reporting.dao.ErrorManagementDao; +import org.ikasan.error.reporting.dao.HibernateErrorManagementDao; import org.ikasan.serialiser.converter.JmsMapMessageConverter; import org.ikasan.serialiser.converter.JmsTextMessageConverter; import org.ikasan.serialiser.service.SerialiserFactoryKryoImpl; @@ -30,6 +32,11 @@ @ImportResource("/test-transaction.xml") public class ErrorReportingTestAutoConfiguration { + @Bean(name = "deleteOnceHarvestedErrorManagementDao") + public ErrorManagementDao deleteOnceHarvestedErrorManagementDao() { + return new HibernateErrorManagementDao(true); + } + @Bean(name = {"ikasan.xads", "ikasan.ds"}) public DataSource ikasanDataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); diff --git a/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDaoTest.java b/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDaoTest.java index 5ccfcf5e21..2aedabe0fd 100644 --- a/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDaoTest.java +++ b/ikasaneip/error-reporting/src/test/java/org/ikasan/error/reporting/dao/HibernateErrorManagementDaoTest.java @@ -73,6 +73,9 @@ public class HibernateErrorManagementDaoTest { @Resource ErrorManagementDao errorManagementDao; + + @Resource + ErrorManagementDao deleteOnceHarvestedErrorManagementDao; @Resource ErrorReportingServiceDao errorReportingServiceDao; @@ -125,9 +128,7 @@ public void test_count_error_occurrence_for_module() this.errorReportingServiceDao.save(eo); this.errorReportingServiceDao.save(eo1); this.errorReportingServiceDao.save(eo2); - - System.out.println(this.errorManagementDao.getNumberOfModuleErrors("moduleName", false, false, new Date(System.currentTimeMillis() - 100000000), new Date(System.currentTimeMillis() + 100000000))); - + Assert.assertTrue(this.errorManagementDao.getNumberOfModuleErrors("moduleName", false, false, new Date(System.currentTimeMillis() - 100000000), new Date(System.currentTimeMillis() + 100000000)) == 3); } @@ -147,11 +148,50 @@ public void test_harvest_success() errorOccurrences.add(eo); } - Assert.assertEquals("Harvestable records == 1000", this.errorManagementDao.getHarvestableRecords(5000).size(), 1000); + Assert.assertEquals("Harvestable records == 1000", 1000, this.errorManagementDao.getHarvestableRecords(5000).size()); this.errorManagementDao.updateAsHarvested(errorOccurrences); - Assert.assertEquals("Harvestable records == 0", this.errorManagementDao.getHarvestableRecords(5000).size(), 0); + Assert.assertEquals("Harvestable records == 0", 0, this.errorManagementDao.getHarvestableRecords(5000).size()); + } + + @Test + @DirtiesContext + public void test_housekeep_success() + { + this.errorManagementDao.setHarvestQueryOrdered(true); + + for(int i=0; i<1000; i++) + { + ErrorOccurrenceImpl eo = new ErrorOccurrenceImpl("moduleName", "flowName", "flowElementName", "errorDetail", "errorMessage", "exceptionClass", 100, new byte[100], "errorString"); + this.errorReportingServiceDao.save(eo); + } + + Assert.assertEquals("Harvestable records == 1000", this.errorManagementDao.getHarvestableRecords(5000).size(), 1000); + + this.errorManagementDao.housekeep(100); + + Assert.assertEquals("Harvestable records == 900", this.errorManagementDao.getHarvestableRecords(5000).size(), 900); + } + + @Test + @DirtiesContext + public void test_housekeep_once_harvested_success() + { + this.deleteOnceHarvestedErrorManagementDao.setHarvestQueryOrdered(true); + + for(int i=0; i<1000; i++) + { + ErrorOccurrenceImpl eo = new ErrorOccurrenceImpl("moduleName", "flowName", "flowElementName", "errorDetail", "errorMessage", "exceptionClass", 100000000L, new byte[100], "errorString"); + eo.setHarvested(true); + this.errorReportingServiceDao.save(eo); + } + + Assert.assertEquals("Harvestable records == 1000", this.errorManagementDao.getHarvestableRecords(5000).size(), 1000); + + this.deleteOnceHarvestedErrorManagementDao.housekeep(100); + + Assert.assertEquals("Harvestable records == 900", this.errorManagementDao.getHarvestableRecords(5000).size(), 900); } @Test diff --git a/ikasaneip/housekeeping/Readme.md b/ikasaneip/housekeeping/Readme.md index e1cb1beb44..8b401eb37d 100644 --- a/ikasaneip/housekeeping/Readme.md +++ b/ikasaneip/housekeeping/Readme.md @@ -15,6 +15,7 @@ Each and every one of the jobs can be tuned and configured by following set of p - {jobName}-houseKeepingBatchSize defaults to 2500 - {jobName}-transactionBatchSize defaults to 200 - {jobName}-cronExpression defaults to '0 0/1 * * * ?' +- {jobName}-deleteOnceHarvested defaults to false (NB applicable for replayHousekeepingJob, wiretapHousekeepingJob, errorReportingHousekeepingJob, systemEventServiceHousekeepingJob) - {jobName}-enabled defaults to true @@ -27,24 +28,28 @@ Each and every one of the jobs can be tuned and configured by following set of p replayHousekeepingJob-houseKeepingBatchSize=2500 replayHousekeepingJob-transactionBatchSize defaults=200 replayHousekeepingJob-cronExpression=0 0/1 * * * ? +replayHousekeepingJob-deleteOnceHarvested=false replayHousekeepingJob-enabled=true # Wiretap housekeeping settings wiretapHousekeepingJob-houseKeepingBatchSize=2500 wiretapHousekeepingJob-transactionBatchSize defaults=200 wiretapHousekeepingJob-cronExpression=0 0/1 * * * ? +wiretapHousekeepingJob-deleteOnceHarvested=false wiretapHousekeepingJob-enabled=true # Error housekeeping settings errorReportingHousekeepingJob-houseKeepingBatchSize=2500 errorReportingHousekeepingJob-transactionBatchSize defaults=200 errorReportingHousekeepingJob-cronExpression=0 0/1 * * * ? +errorReportingHousekeepingJob-deleteOnceHarvested=false errorReportingHousekeepingJob-enabled=true # SystemEvents housekeeping settings systemEventServiceHousekeepingJob-houseKeepingBatchSize=2500 systemEventServiceHousekeepingJob-transactionBatchSize defaults=200 systemEventServiceHousekeepingJob-cronExpression=0 0/1 * * * ? +systemEventServiceHousekeepingJob-deleteOnceHarvested=false systemEventServiceHousekeepingJob-enabled=true # Duplicate Filter housekeeping settings diff --git a/ikasaneip/replay/src/main/java/org/ikasan/replay/ReplayAutoConfiguration.java b/ikasaneip/replay/src/main/java/org/ikasan/replay/ReplayAutoConfiguration.java index d53f5027d8..cecbba4404 100644 --- a/ikasaneip/replay/src/main/java/org/ikasan/replay/ReplayAutoConfiguration.java +++ b/ikasaneip/replay/src/main/java/org/ikasan/replay/ReplayAutoConfiguration.java @@ -7,6 +7,7 @@ import org.ikasan.spec.replay.*; import org.ikasan.spec.serialiser.SerialiserFactory; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.orm.jpa.JpaVendorAdapter; @@ -19,8 +20,11 @@ @Configuration public class ReplayAutoConfiguration { + @Value("${replayHousekeepingJob-deleteOnceHarvested:false}") + private boolean deleteOnceHarvested; + @Bean(name = "replayManagementService") - public ReplayManagementService replayManagementService(ReplayDao replayDao, ReplayAuditDao replayAuditDao) { + public ReplayManagementService replayManagementService(@Qualifier("replayDao")ReplayDao replayDao, ReplayAuditDao replayAuditDao) { return new ReplayManagementServiceImpl(replayDao, replayAuditDao); } @@ -30,13 +34,14 @@ public ReplayService replayService(ReplayAuditDao replayAuditDao) { } @Bean - public ReplayRecordService replayRecordService(ReplayDao replayDao, SerialiserFactory serialiserFactory) { + public ReplayRecordService replayRecordService(@Qualifier("replayDao")ReplayDao replayDao + , SerialiserFactory serialiserFactory) { return new ReplayRecordServiceImpl(serialiserFactory, replayDao); } - @Bean + @Bean(name = "replayDao") public ReplayDao replayDao() { - return new HibernateReplayDao(); + return new HibernateReplayDao(this.deleteOnceHarvested); } @Bean diff --git a/ikasaneip/replay/src/main/java/org/ikasan/replay/dao/HibernateReplayDao.java b/ikasaneip/replay/src/main/java/org/ikasan/replay/dao/HibernateReplayDao.java index aa504030c0..49b2ca7d9c 100644 --- a/ikasaneip/replay/src/main/java/org/ikasan/replay/dao/HibernateReplayDao.java +++ b/ikasaneip/replay/src/main/java/org/ikasan/replay/dao/HibernateReplayDao.java @@ -93,6 +93,8 @@ public class HibernateReplayDao implements ReplayDao,ReplayAuditDao getReplayEvents(List moduleNames, if (payloadContent != null && payloadContent.length() > 0) { - //criteria.add(Restrictions.like("eventAsString", payloadContent, MatchMode.ANYWHERE)); predicates.add( builder.like(root.get("eventAsString"),payloadContent)); } @@ -407,8 +413,9 @@ public Long getNumberReplayAuditEventsByAuditId(final Long id) @Override public void housekeep(final Integer numToHousekeep) { - Query query = this.entityManager.createQuery(REPLAY_EVENTS_TO_DELETE_QUERY); - query.setParameter(NOW, System.currentTimeMillis()); + Query query = this.entityManager.createQuery(this.deleteOnceHarvested ? + HARVESTED_REPLAY_EVENTS_TO_DELETE_QUERY : REPLAY_EVENTS_TO_DELETE_QUERY); + if(!this.deleteOnceHarvested)query.setParameter(NOW, System.currentTimeMillis()); query.setMaxResults(numToHousekeep); List replayEventIds = query.getResultList(); diff --git a/ikasaneip/replay/src/test/java/org/ikasan/replay/ReplayTestAutoConfiguration.java b/ikasaneip/replay/src/test/java/org/ikasan/replay/ReplayTestAutoConfiguration.java index 2bfbe969a0..2921aa9f9e 100644 --- a/ikasaneip/replay/src/test/java/org/ikasan/replay/ReplayTestAutoConfiguration.java +++ b/ikasaneip/replay/src/test/java/org/ikasan/replay/ReplayTestAutoConfiguration.java @@ -4,14 +4,20 @@ import com.arjuna.ats.jta.UserTransaction; import jakarta.jms.MapMessage; import jakarta.jms.TextMessage; +import org.ikasan.replay.dao.HibernateReplayDao; +import org.ikasan.replay.service.ReplayManagementServiceImpl; import org.ikasan.serialiser.converter.JmsMapMessageConverter; import org.ikasan.serialiser.converter.JmsTextMessageConverter; import org.ikasan.serialiser.service.SerialiserFactoryKryoImpl; import org.ikasan.spec.module.ModuleContainer; +import org.ikasan.spec.replay.ReplayAuditDao; +import org.ikasan.spec.replay.ReplayDao; +import org.ikasan.spec.replay.ReplayManagementService; import org.ikasan.spec.serialiser.Serialiser; import org.ikasan.spec.serialiser.SerialiserFactory; import org.jmock.Mockery; import org.jmock.imposters.ByteBuddyClassImposteriser; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.ImportResource; @@ -56,6 +62,17 @@ public ModuleContainer moduleContainer() { return mockery.mock(ModuleContainer.class); } + @Bean(name = "deleteOnceHarvestedReplayManagementService") + public ReplayManagementService deleteOnceHarvestedReplayManagementService(@Qualifier("deleteOnceHarvestedReplayDao")ReplayDao replayDao + , ReplayAuditDao replayAuditDao) { + return new ReplayManagementServiceImpl(replayDao, replayAuditDao); + } + + @Bean(name = "deleteOnceHarvestedReplayDao") + public ReplayDao deleteOnceHarvestedReplayDao() { + return new HibernateReplayDao(true); + } + @Bean(name = {"ikasan.xads", "ikasan.ds"}) public DataSource ikasanDataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); diff --git a/ikasaneip/replay/src/test/java/org/ikasan/replay/service/ReplayServiceTest.java b/ikasaneip/replay/src/test/java/org/ikasan/replay/service/ReplayServiceTest.java index 57170e114b..37735397df 100644 --- a/ikasaneip/replay/src/test/java/org/ikasan/replay/service/ReplayServiceTest.java +++ b/ikasaneip/replay/src/test/java/org/ikasan/replay/service/ReplayServiceTest.java @@ -90,10 +90,11 @@ public class ReplayServiceTest @Resource ReplayDao replayDao; + @Resource ReplayService replayService; - @Resource(name = "replayDao") ReplayAuditDao replayAuditDao; + @Resource ReplayManagementServiceImpl replayManagementService; - @Resource ReplayService replayService; + @Resource ReplayManagementServiceImpl deleteOnceHarvestedReplayManagementService; @Resource SerialiserFactory ikasanSerialiserFactory; @@ -113,8 +114,7 @@ public void addReplayEvents() for(int i=0; i<50; i++) { ReplayEventImpl replayEvent = new ReplayEventImpl("errorUri-" + i, "this is a test event".getBytes(), "this is a test event", "moduleName", "flowName", 0); - - + replayEvent.setHarvested(true); this.replayDao.saveOrUpdate(replayEvent); } @@ -123,6 +123,7 @@ public void addReplayEvents() for(int i=50; i<100; i++) { ReplayEventImpl replayEvent = new ReplayEventImpl("errorUri-" + i, "this is a test event".getBytes(), "this is a test event", "moduleName", "flowName", 0); + replayEvent.setHarvested(true); replayEvents.add(replayEvent); } @@ -130,7 +131,6 @@ public void addReplayEvents() stubFor(put(urlEqualTo("/moduleName/rest/replay/eventReplay/moduleName/flowName")) .withHeader(HttpHeaders.USER_AGENT, equalTo("moduleName")) - .willReturn(aResponse() .withStatus(200) .withHeader("Content-Type", "text/xml") @@ -176,98 +176,98 @@ public void test_replay_success() throws MalformedURLException this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); - List replayAudits = this.replayAuditDao.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + List replayAudits = this.replayManagementService.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); ReplayAuditImpl replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(moduleNames, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(moduleNames, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(moduleNames, flowNames, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(moduleNames, flowNames, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(null, flowNames, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(null, flowNames, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(moduleNames, flowNames, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(moduleNames, flowNames, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(moduleNames, flowNames, null, "user", new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(moduleNames, flowNames, null, "user", new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(moduleNames, flowNames, "errorUri-10", "user", new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(moduleNames, flowNames, "errorUri-10", "user", new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); - replayAudits = this.replayAuditDao.getReplayAudits(moduleNames, flowNames, "errorUri-10", "user", new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(moduleNames, flowNames, "errorUri-10", "user", new Date(0), new Date(System.currentTimeMillis() + 1000000)); Assert.assertTrue(replayAudits.size() == 1); replayAudit = replayAudits.get(0); - replayAudit = this.replayAuditDao.getReplayAuditById(replayAudit.getId()); + replayAudit = this.replayManagementService.getReplayAuditById(replayAudit.getId()); - Assert.assertTrue(this.replayAuditDao.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); + Assert.assertTrue(this.replayManagementService.getReplayAuditEventsByAuditId(replayAudit.getId()).size() == 100); Assert.assertTrue(listener.count == 100); } @Test @DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD) - public void test_delete_success() throws MalformedURLException + public void test_delete_success() { // expectations mockery.checking(new Expectations() @@ -305,34 +305,103 @@ public void test_delete_success() throws MalformedURLException this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); - List replayAudits = this.replayAuditDao.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + List replayAudits = this.replayManagementService.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); Long auditId1 = replayAudits.get(0).getId(); Long auditId2 = replayAudits.get(1).getId(); Long auditId3 = replayAudits.get(2).getId(); Long auditId4 = replayAudits.get(3).getId(); - List auditEvents = this.replayAuditDao.getReplayAuditEventsByAuditId(auditId1); - auditEvents.addAll(this.replayAuditDao.getReplayAuditEventsByAuditId(auditId2)); - auditEvents.addAll(this.replayAuditDao.getReplayAuditEventsByAuditId(auditId3)); - auditEvents.addAll(this.replayAuditDao.getReplayAuditEventsByAuditId(auditId4)); + List auditEvents = this.replayManagementService.getReplayAuditEventsByAuditId(auditId1); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId2)); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId3)); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId4)); - this.replayDao.housekeep(1000); + this.replayManagementService.housekeep(); - replayAudits = this.replayAuditDao.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayAudits = this.replayManagementService.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); replayEvents = this.replayDao.getReplayEvents (moduleNames, flowNames, "", "", new Date(0), new Date(System.currentTimeMillis() + 1000000), 100); - auditEvents = this.replayAuditDao.getReplayAuditEventsByAuditId(auditId1); - auditEvents.addAll(this.replayAuditDao.getReplayAuditEventsByAuditId(auditId2)); - auditEvents.addAll(this.replayAuditDao.getReplayAuditEventsByAuditId(auditId3)); - auditEvents.addAll(this.replayAuditDao.getReplayAuditEventsByAuditId(auditId4)); + auditEvents = this.replayManagementService.getReplayAuditEventsByAuditId(auditId1); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId2)); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId3)); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId4)); Assert.assertTrue("Replay audits must be empty!", replayAudits.size() == 0); Assert.assertTrue("Replay events must be empty!", replayEvents.size() == 0); Assert.assertTrue("Replay audit events must be empty!", auditEvents.size() == 0); } - - + + + @Test + @DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD) + public void test_delete_after_harvesting_success() + { + // expectations + mockery.checking(new Expectations() + { + { + for(int i=0; i<100; i++) + { + // get each flow name + one(ikasanSerialiserFactory).getDefaultSerialiser(); + will(returnValue(serialiser)); + one(serialiser).deserialise("event".getBytes()); + will(returnValue("event".getBytes())); + } + + } + }); + + ReplayListenerImpl listener = new ReplayListenerImpl(); + this.replayService.addReplayListener(listener); + + ArrayList moduleNames = new ArrayList(); + moduleNames.add("moduleName"); + + ArrayList flowNames = new ArrayList(); + flowNames.add("flowName"); + + List replayEvents = this.replayDao.getReplayEvents + (moduleNames, flowNames, "", "", new Date(0), new Date(System.currentTimeMillis() + 1000000), 100); + + HashMap contextMappings = new HashMap<>(); + contextMappings.put("moduleName", "moduleName"); + + this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); + this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); + this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); + this.replayService.replay(baseUri, replayEvents, "user", "password", "user", "this is a test!", contextMappings); + + List replayAudits = this.deleteOnceHarvestedReplayManagementService.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + + Long auditId1 = replayAudits.get(0).getId(); + Long auditId2 = replayAudits.get(1).getId(); + Long auditId3 = replayAudits.get(2).getId(); + Long auditId4 = replayAudits.get(3).getId(); + + List auditEvents = this.deleteOnceHarvestedReplayManagementService.getReplayAuditEventsByAuditId(auditId1); + auditEvents.addAll(this.deleteOnceHarvestedReplayManagementService.getReplayAuditEventsByAuditId(auditId2)); + auditEvents.addAll(this.deleteOnceHarvestedReplayManagementService.getReplayAuditEventsByAuditId(auditId3)); + auditEvents.addAll(this.deleteOnceHarvestedReplayManagementService.getReplayAuditEventsByAuditId(auditId4)); + + this.deleteOnceHarvestedReplayManagementService.housekeep(); + + replayAudits = this.replayManagementService.getReplayAudits(null, null, null, null, new Date(0), new Date(System.currentTimeMillis() + 1000000)); + replayEvents = this.replayDao.getReplayEvents + (moduleNames, flowNames, "", "", new Date(0), new Date(System.currentTimeMillis() + 1000000), 100); + auditEvents = this.replayManagementService.getReplayAuditEventsByAuditId(auditId1); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId2)); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId3)); + auditEvents.addAll(this.replayManagementService.getReplayAuditEventsByAuditId(auditId4)); + + Assert.assertTrue("Replay audits must be empty!", replayAudits.size() == 0); + Assert.assertTrue("Replay events must be empty!", replayEvents.size() == 0); + Assert.assertTrue("Replay audit events must be empty!", auditEvents.size() == 0); + } + + + class ReplayListenerImpl implements ReplayListener { public int count = 0; diff --git a/ikasaneip/spec/service/system-event/src/main/java/org/ikasan/spec/systemevent/SystemEventDao.java b/ikasaneip/spec/service/system-event/src/main/java/org/ikasan/spec/systemevent/SystemEventDao.java index 0ec1330be8..a8fa936c3c 100644 --- a/ikasaneip/spec/service/system-event/src/main/java/org/ikasan/spec/systemevent/SystemEventDao.java +++ b/ikasaneip/spec/service/system-event/src/main/java/org/ikasan/spec/systemevent/SystemEventDao.java @@ -117,8 +117,6 @@ PagedSearchResult find(final int pageNo, final int pageSize, final String */ void setTransactionBatchSize(Integer transactionBatchSize); - void setHousekeepQuery(String housekeepQuery); - /** * Get (housekeepingBatchSize) harvestable records. * diff --git a/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/SystemEventAutoConfiguration.java b/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/SystemEventAutoConfiguration.java index 6e856462c8..34f6cd6684 100644 --- a/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/SystemEventAutoConfiguration.java +++ b/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/SystemEventAutoConfiguration.java @@ -30,17 +30,20 @@ public class SystemEventAutoConfiguration { @Value("${system.event.transaction.batch.size:1000}") private int systemEventTransactionBatchSize; + @Value("${systemEventServiceHousekeepingJob-deleteOnceHarvested:false}") + private boolean deleteOnceHarvested; + @Bean @DependsOn({"systemEventDao", "moduleContainer"}) - public SystemEventService systemEventService(SystemEventDao systemEventDao, ModuleContainer moduleContainer) { + public SystemEventService systemEventService(@Qualifier("systemEventDao") SystemEventDao systemEventDao, ModuleContainer moduleContainer) { return new SystemEventServiceImpl(systemEventDao, systemEventExpiryMinutes, moduleContainer); } - @Bean + @Bean(name = "systemEventDao") @DependsOn("systemEventEntityManager") public SystemEventDao systemEventDao() { - return new HibernateSystemEventDao(true - , systemEventHouseKeepingBatchSize, systemEventTransactionBatchSize); + return new HibernateSystemEventDao(true, systemEventHouseKeepingBatchSize + , systemEventTransactionBatchSize, this.deleteOnceHarvested); } @Bean diff --git a/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/dao/HibernateSystemEventDao.java b/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/dao/HibernateSystemEventDao.java index 68760259ab..439a7f6106 100644 --- a/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/dao/HibernateSystemEventDao.java +++ b/ikasaneip/system-event/src/main/java/org/ikasan/systemevent/dao/HibernateSystemEventDao.java @@ -88,8 +88,13 @@ public class HibernateSystemEventDao implements SystemEventDao */ private static final String HOUSEKEEP_QUERY = "delete SystemEventImpl w where w.expiry <= :" + EXPIRY; + private static final String HARVESTED_HOUSEKEEP_QUERY = "delete SystemEventImpl w where w.harvested = true"; + public static final String SYSTEM_EVENTS_TO_DELETE_QUERY = - "select id from SystemEventImpl se " + " where se.expiry < :" + NOW; + "select id from SystemEventImpl se where se.expiry < :" + NOW; + + public static final String HARVESTED_SYSTEM_EVENTS_TO_DELETE_QUERY = + "select id from SystemEventImpl se where se.harvested = true order by se.timestamp"; public static final String SYSTEM_EVENTS_DELETE_QUERY = "delete SystemEventImpl se " + " where se.id in(:" + EVENT_IDS + ")"; @@ -118,21 +123,23 @@ public class HibernateSystemEventDao implements SystemEventDao */ private Integer transactionBatchSize = 1000; - private String housekeepQuery; + private boolean deleteOnceHarvested; /** * Constructor * * @param batchHousekeepDelete - pass true if you want to use batch deleting - * @param housekeepingBatchSize - batch size, only respected if set to use - * batching + * @param housekeepingBatchSize - batch size, only respected if set to use batching + * @param transactionBatchSize - the over all size of the batch to delete + * @param deleteOnceHarvested - flag to indicate if we should delete entities once harvested or wait until expiry */ public HibernateSystemEventDao(boolean batchHousekeepDelete, Integer housekeepingBatchSize, - Integer transactionBatchSize) { + Integer transactionBatchSize, boolean deleteOnceHarvested) { this(); this.batchHousekeepDelete = batchHousekeepDelete; this.housekeepingBatchSize = housekeepingBatchSize; this.transactionBatchSize = transactionBatchSize; + this.deleteOnceHarvested = deleteOnceHarvested; } /** @@ -308,8 +315,8 @@ static boolean restrictionExists(Object restrictionValue) { */ public void deleteExpired() { if ( !batchHousekeepDelete ) { - Query query = entityManager.createQuery(HOUSEKEEP_QUERY); - query.setParameter(EXPIRY, new Date()); + Query query = entityManager.createQuery(this.deleteOnceHarvested ? HARVESTED_HOUSEKEEP_QUERY : HOUSEKEEP_QUERY); + if(!this.deleteOnceHarvested)query.setParameter(EXPIRY, new Date()); query.executeUpdate(); } else { @@ -331,8 +338,9 @@ private void batchHousekeepDelete() { while (housekeepablesExist() && numberDeleted < this.transactionBatchSize) { numberDeleted += this.housekeepingBatchSize; - Query query = entityManager.createQuery(SYSTEM_EVENTS_TO_DELETE_QUERY); - query.setParameter(NOW, new Date()); + Query query = entityManager.createQuery(this.deleteOnceHarvested ? HARVESTED_SYSTEM_EVENTS_TO_DELETE_QUERY + : SYSTEM_EVENTS_TO_DELETE_QUERY); + if(!this.deleteOnceHarvested)query.setParameter(NOW, new Date()); query.setMaxResults(housekeepingBatchSize); List wiretapEventIds = (List) query.getResultList(); @@ -358,7 +366,12 @@ public boolean housekeepablesExist() CriteriaQuery criteriaQuery = builder.createQuery(Long.class); Root root = criteriaQuery.from(SystemEventImpl.class); - criteriaQuery.select(builder.count(root)).where(builder.lessThan(root.get("expiry"), new Date())); + if(this.deleteOnceHarvested) { + criteriaQuery.select(builder.count(root)).where(builder.equal(root.get("harvested"), true)); + } + else { + criteriaQuery.select(builder.count(root)).where(builder.lessThan(root.get("expiry"), new Date())); + } Query query = entityManager.createQuery(criteriaQuery); List rowCountList = query.getResultList(); @@ -396,11 +409,6 @@ public void setTransactionBatchSize(Integer transactionBatchSize) { this.transactionBatchSize = transactionBatchSize; } - @Override - public void setHousekeepQuery(String housekeepQuery) { - this.housekeepQuery = housekeepQuery; - } - @Override public List getHarvestableRecords(final int harvestingBatchSize) { CriteriaBuilder builder = entityManager.getCriteriaBuilder(); diff --git a/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/SystemEventTestAutoConfiguration.java b/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/SystemEventTestAutoConfiguration.java index 387802bba4..0cdf2921e2 100644 --- a/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/SystemEventTestAutoConfiguration.java +++ b/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/SystemEventTestAutoConfiguration.java @@ -4,9 +4,12 @@ import com.arjuna.ats.jta.UserTransaction; import jakarta.persistence.EntityManagerFactory; import org.ikasan.spec.module.ModuleContainer; +import org.ikasan.spec.systemevent.SystemEventDao; +import org.ikasan.systemevent.dao.HibernateSystemEventDao; import org.ikasan.systemevent.service.TestModuleContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.ImportResource; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.orm.jpa.JpaTransactionManager; @@ -24,6 +27,13 @@ @ImportResource("/transactions.xml") public class SystemEventTestAutoConfiguration { + @Bean(name = "deleteOnceHarvestedSystemEventDao") + @DependsOn("systemEventEntityManager") + public SystemEventDao deleteOnceHarvestedSystemEventDao() { + return new HibernateSystemEventDao(true, 100 + , 1000, true); + } + @Bean(name = {"ikasan.xads", "ikasan.ds"}) public DataSource ikasanXaDataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); diff --git a/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/dao/HibernateSystemEventDaoTest.java b/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/dao/HibernateSystemEventDaoTest.java index f04b650034..1fbd4aa857 100644 --- a/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/dao/HibernateSystemEventDaoTest.java +++ b/ikasaneip/system-event/src/test/java/org/ikasan/systemevent/dao/HibernateSystemEventDaoTest.java @@ -73,6 +73,9 @@ public class HibernateSystemEventDaoTest @Autowired private SystemEventDao systemEventDao; + @Autowired + private SystemEventDao deleteOnceHarvestedSystemEventDao; + @Test public void test_deleteExpiredWithBatchHousekeepDeleteTrueAndTransactionBatchSize2000() { systemEventDao.setBatchHousekeepDelete(true); @@ -95,6 +98,72 @@ public void test_deleteExpiredWithBatchHousekeepDeleteTrueAndTransactionBatchSiz assertEquals(0, result.size()); } + @Test + public void test_deleteExpiredWithBatchHousekeepDeleteFalse() { + systemEventDao.setBatchHousekeepDelete(false); + + for(int i=0; i< 10000; i++) + { + systemEventDao.save(new SystemEventImpl("subject", "action", new Date() + , "actor", new Date(System.currentTimeMillis() - 1000000000))); + } + + while(systemEventDao.housekeepablesExist()) + { + this.systemEventDao.deleteExpired(); + } + + List result = systemEventDao.list(null,null,null,null); + + assertEquals(0, result.size()); + } + + @Test + public void test_deleteOnceHarvestedWithBatchHousekeepDeleteTrueAndTransactionBatchSize2000() { + deleteOnceHarvestedSystemEventDao.setBatchHousekeepDelete(true); + deleteOnceHarvestedSystemEventDao.setHousekeepingBatchSize(100); + deleteOnceHarvestedSystemEventDao.setTransactionBatchSize(2000); + + for(int i=0; i< 10000; i++) + { + SystemEventImpl systemEvent = new SystemEventImpl("subject", "action", new Date() + , "actor", new Date(System.currentTimeMillis() - 1000000000)); + systemEvent.setHarvested(true); + deleteOnceHarvestedSystemEventDao.save(systemEvent); + } + + while(deleteOnceHarvestedSystemEventDao.housekeepablesExist()) + { + this.deleteOnceHarvestedSystemEventDao.deleteExpired(); + } + + List result = systemEventDao.list(null,null,null,null); + + assertEquals(0, result.size()); + } + + @Test + public void test_deleteOnceHarvestedWithBatchHousekeepDeleteFalse() { + deleteOnceHarvestedSystemEventDao.setBatchHousekeepDelete(false); + + for(int i=0; i< 10000; i++) + { + SystemEventImpl systemEvent = new SystemEventImpl("subject", "action", new Date() + , "actor", new Date(System.currentTimeMillis() - 1000000000)); + systemEvent.setHarvested(true); + deleteOnceHarvestedSystemEventDao.save(systemEvent); + } + + while(deleteOnceHarvestedSystemEventDao.housekeepablesExist()) + { + this.deleteOnceHarvestedSystemEventDao.deleteExpired(); + } + + List result = systemEventDao.list(null,null,null,null); + + assertEquals(0, result.size()); + } + @Test public void test_deleteExpiredWithBatchHousekeepDeleteTrueAndTransactionBatchSize20000() { systemEventDao.setBatchHousekeepDelete(true); diff --git a/ikasaneip/wiretap/src/main/java/org/ikasan/WiretapAutoConfiguration.java b/ikasaneip/wiretap/src/main/java/org/ikasan/WiretapAutoConfiguration.java index 38c20425ab..75bb4b382a 100644 --- a/ikasaneip/wiretap/src/main/java/org/ikasan/WiretapAutoConfiguration.java +++ b/ikasaneip/wiretap/src/main/java/org/ikasan/WiretapAutoConfiguration.java @@ -42,6 +42,9 @@ public class WiretapAutoConfiguration { @Value("${wiretap.housekeeping.batch.size:1000}") private int wiretapHouseKeepingBatchSize; + @Value("${wiretapHousekeepingJob-deleteOnceHarvested:false}") + private boolean deleteOnceHarvested; + @Bean @DependsOn({"liquibase","moduleMetadataDashboardRestService"}) JobAwareFlowEventListener wiretapFlowEventListener(Map flowEventJobs, TriggerDao triggerDao @@ -51,7 +54,8 @@ JobAwareFlowEventListener wiretapFlowEventListener(Map flo } @Bean - WiretapService wiretapService(WiretapDao wiretapDao, ModuleService moduleService, WiretapEventFactory wiretapEventFactory) { + WiretapService wiretapService(@Qualifier("wiretapDao") WiretapDao wiretapDao + , ModuleService moduleService, WiretapEventFactory wiretapEventFactory) { return new WiretapServiceImpl(wiretapDao, moduleService, wiretapEventFactory); } @@ -69,9 +73,9 @@ MessageHistoryService messageHistoryService(MessageHistoryDao messageHistoryDao, return messageHistoryService; } - @Bean + @Bean(name = "wiretapDao") WiretapDao wiretapDao() { - return new HibernateWiretapDao(true, wiretapHouseKeepingBatchSize); + return new HibernateWiretapDao(true, wiretapHouseKeepingBatchSize, deleteOnceHarvested); } @Bean diff --git a/ikasaneip/wiretap/src/main/java/org/ikasan/wiretap/dao/HibernateWiretapDao.java b/ikasaneip/wiretap/src/main/java/org/ikasan/wiretap/dao/HibernateWiretapDao.java index b5bc43b58a..96f6f901e7 100644 --- a/ikasaneip/wiretap/src/main/java/org/ikasan/wiretap/dao/HibernateWiretapDao.java +++ b/ikasaneip/wiretap/src/main/java/org/ikasan/wiretap/dao/HibernateWiretapDao.java @@ -65,10 +65,12 @@ import java.util.*; /** - * Hibernate implementation of the WiretapDao - * - * @author Ikasan Development Team - * + * The HibernateWiretapDao class is an implementation of the WiretapDao interface using Hibernate as the underlying persistence framework. + * + * This class provides methods for saving wiretap events, finding wiretap events based on various criteria, deleting expired wiretaps, + * performing housekeeping operations, and managing configuration properties such as batch housekeeping and transaction batch size. + * + * @author Your Name */ public class HibernateWiretapDao implements WiretapDao { @@ -80,13 +82,13 @@ public class HibernateWiretapDao implements WiretapDao private static final String EXPIRY = "expiry"; private static final String EVENT_ID = "eventId"; - private static final String BATCH_SIZE = "batchSize"; public static final String EVENT_IDS = "eventIds"; public static final String CURRENT_DATE_TIME = "currentDateTime"; public static final String NOW = "now"; /** Query used for housekeeping expired persistence events */ private static final String HOUSEKEEP_DELETE_QUERY = "delete WiretapFlowEvent w where w.expiry <= :" + EXPIRY; + private static final String HARVESTED_HOUSEKEEP_DELETE_QUERY = "delete WiretapFlowEvent w where w.harvested=true"; /** Query for finding all persistence events with the same payloadId */ private static final String WIRETAP_IDS_FOR_GROUPED_EVENT_ID = "select w.id from WiretapFlowEvent w where w.eventId = :" + EVENT_ID; @@ -95,6 +97,9 @@ public class HibernateWiretapDao implements WiretapDao public static final String WIRETAP_EVENTS_TO_DELETE_QUERY = "select id from WiretapFlowEvent w " + " where w.expiry < :" + NOW; + public static final String HARVESTED_WIRETAP_EVENTS_TO_DELETE_QUERY = "select id from WiretapFlowEvent w " + + " where w.harvested= true ORDER BY w.timestamp asc"; + public static final String WIRETAP_EVENTS_DELETE_QUERY = "delete WiretapFlowEvent w " + " where w.id in(:" + EVENT_IDS + ")"; @@ -114,6 +119,7 @@ public class HibernateWiretapDao implements WiretapDao private String housekeepQuery; private boolean isHarvestQueryOrdered = false; + private boolean deleteOnceHarvested; /** * Constructor @@ -129,32 +135,43 @@ public HibernateWiretapDao() { * @param housekeepingBatchSize - batch size, only respected if set to use batching */ public HibernateWiretapDao(boolean batchHousekeepDelete, - Integer housekeepingBatchSize) { + Integer housekeepingBatchSize, boolean deleteOnceHarvested) { this(); this.batchHousekeepDelete = batchHousekeepDelete; this.housekeepingBatchSize = housekeepingBatchSize; + this.deleteOnceHarvested = deleteOnceHarvested; } + /** - * Save the wiretapFlowEvent + * Saves a WiretapEvent in the database. * - * @see - * WiretapDao#save(WiretapEvent) + * @param wiretapEvent the WiretapEvent object to be saved */ public void save(WiretapEvent wiretapEvent) { this.entityManager.persist(wiretapEvent); } + /** + * Saves a list of wiretap events. + * + * @param wiretapEvents the list of wiretap events to be saved + */ @Override public void save(List wiretapEvents) { wiretapEvents.forEach(wiretapEvent -> this.save(wiretapEvent)); } + /** - * Find the Wiretap by its Id + * Finds a WiretapEvent object by its identifier. + * + * @param identifier the unique identifier of the WiretapEvent + * + * @return the WiretapEvent object with the specified identifier, + * or null if no WiretapEvent is found */ - @SuppressWarnings("unchecked") @Override public WiretapEvent findById(final Long identifier) { WiretapFlowEvent wiretapEvent = this.entityManager.find(WiretapFlowEvent.class, identifier); @@ -200,7 +217,7 @@ public WiretapEvent findById(final Long identifier) { * * @return PagedSearchResult */ - @SuppressWarnings("unchecked") + @Override public PagedSearchResult findWiretapEvents(final int pageNo, final int pageSize, final String orderBy, final boolean orderAscending, final Set moduleNames, final String moduleFlow, final String componentName, final String eventId, final String payloadId, final Date fromDate, final Date untilDate, final String payloadContent) { @@ -219,10 +236,24 @@ public PagedSearchResult findWiretapEvents(final int pageNo, final } - /* (non-Javadoc) - * @see org.ikasan.spec.persistence.WiretapDao#findWiretapEvents(int, int, java.lang.String, boolean, java.util.Set, java.util.Set, java.util.Set, java.lang.String, java.lang.String, java.util.Date, java.util.Date, java.lang.String) - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) + /** + * Finds wiretap events based on the specified search criteria. + * + * @param pageNo The page number of the results. + * @param pageSize The number of results per page. + * @param orderBy The attribute to order the results by. + * @param orderAscending Specifies whether the results should be ordered in ascending order. + * @param moduleNames Set of module names to filter the results. + * @param moduleFlows Set of module flows to filter the results. + * @param componentNames Set of component names to filter the results. + * @param eventId The event ID to filter the results. + * @param payloadId The payload ID to filter the results. + * @param fromDate The start date to filter the results. + * @param untilDate The end date to filter the results. + * @param payloadContent The content of the payload to filter the results. + * + * @return A PagedSearchResult containing the wiretap events that match the specified criteria. + */ @Override public PagedSearchResult findWiretapEvents(final int pageNo, final int pageSize, final String orderBy, final boolean orderAscending, final Set moduleNames, final Set moduleFlows, final Set componentNames, final String eventId, final String payloadId, @@ -266,6 +297,24 @@ public PagedSearchResult findWiretapEvents(final int pageNo, final return new ArrayListPagedSearchResult(results, firstResult, rowCount); } + /** + * Calculates the total number of rows for a paged search based on the specified criteria. + * + * @param pageNo The page number to retrieve. + * @param pageSize The size of the page. + * @param orderBy The field to order the results by. + * @param orderAscending The flag indicating whether to sort in ascending order. + * @param moduleNames The set of module names. + * @param moduleFlows The set of module flows. + * @param componentNames The set of component names. + * @param eventId The event ID. + * @param payloadId The payload ID. + * @param fromDate The starting date. + * @param untilDate The ending date. + * @param payloadContent The payload content. + * + * @return The total number of rows. + */ private Long rowCount(final int pageNo, final int pageSize, final String orderBy, final boolean orderAscending, final Set moduleNames, final Set moduleFlows, final Set componentNames, final String eventId, final String payloadId, final Date fromDate, final Date untilDate, final String payloadContent){ @@ -292,6 +341,7 @@ private Long rowCount(final int pageNo, final int pageSize, final String orderBy /** * Create a criteria instance for each invocation of data or metadata queries. + * * @param builder * @param root * @return @@ -368,11 +418,12 @@ static final boolean restrictionExists(Object restrictionValue) /** * Delete all of the expired wiretaps */ + @Override public void deleteAllExpired() { if (!batchHousekeepDelete) { - Query query = this.entityManager.createQuery(HOUSEKEEP_DELETE_QUERY); - query.setParameter(EXPIRY, System.currentTimeMillis()); + Query query = this.entityManager.createQuery(deleteOnceHarvested? HARVESTED_HOUSEKEEP_DELETE_QUERY : HOUSEKEEP_DELETE_QUERY); + if(!deleteOnceHarvested)query.setParameter(EXPIRY, System.currentTimeMillis()); query.executeUpdate(); } else { @@ -397,8 +448,9 @@ private void batchHousekeepDelete() numberDeleted += this.housekeepingBatchSize; - Query query = this.entityManager.createQuery(WIRETAP_EVENTS_TO_DELETE_QUERY); - query.setParameter(NOW, System.currentTimeMillis()); + Query query = this.entityManager + .createQuery(deleteOnceHarvested ? HARVESTED_WIRETAP_EVENTS_TO_DELETE_QUERY : WIRETAP_EVENTS_TO_DELETE_QUERY); + if(!deleteOnceHarvested)query.setParameter(NOW, System.currentTimeMillis()); query.setMaxResults(housekeepingBatchSize); List wiretapEventIds = (List)query.getResultList(); @@ -418,13 +470,20 @@ private void batchHousekeepDelete() * * @return true if there is at least 1 expired WiretapFlowEvent */ + @Override public boolean housekeepablesExist() { CriteriaBuilder builder = this.entityManager.getCriteriaBuilder(); CriteriaQuery criteriaQuery = builder.createQuery(Long.class); Root root = criteriaQuery.from(WiretapFlowEvent.class); - criteriaQuery.select(builder.count(root)) - .where(builder.lessThan(root.get("expiry"),System.currentTimeMillis())); + if(this.deleteOnceHarvested) { + criteriaQuery.select(builder.count(root)) + .where(builder.equal(root.get("harvested"), true)); + } + else { + criteriaQuery.select(builder.count(root)) + .where(builder.lessThan(root.get("expiry"), System.currentTimeMillis())); + } Query query = this.entityManager.createQuery(criteriaQuery); @@ -439,6 +498,13 @@ public boolean housekeepablesExist() { return Boolean.valueOf(rowCount > 0); } + /** + * Retrieves a list of harvestable wiretap records. + * + * @param housekeepingBatchSize the maximum number of records to retrieve + * @return a list of harvestable wiretap records + */ + @Override public List getHarvestableRecords(final int housekeepingBatchSize) { CriteriaBuilder builder = this.entityManager.getCriteriaBuilder(); CriteriaQuery criteriaQuery = builder.createQuery(WiretapEvent.class); @@ -458,6 +524,12 @@ public List getHarvestableRecords(final int housekeepingBatchSize) return query.getResultList(); } + /** + * Update the given list of WiretapEvents as harvested. + * + * @param events The list of WiretapEvents to be updated + */ + @Override public void updateAsHarvested(List events) { List wiretapEventIds = new ArrayList(); @@ -477,48 +549,92 @@ public void updateAsHarvested(List events) { } } + + /** + * Returns the value of the batchHousekeepDelete flag. + * + * @return the value of the batchHousekeepDelete flag + */ + @Override public boolean isBatchHousekeepDelete() { return batchHousekeepDelete; } + /** + * Sets the flag for batch housekeeping deletion. + * + * @param batchHousekeepDelete true to enable batch deleting, false otherwise + */ + @Override public void setBatchHousekeepDelete(boolean batchHousekeepDelete) { this.batchHousekeepDelete = batchHousekeepDelete; } + /** + * Retrieves the batch size for housekeeping. + * + * @return The batch size for housekeeping + */ + @Override public Integer getHousekeepingBatchSize() { return housekeepingBatchSize; } + + /** + * Sets the batch size for housekeeping. + * + * @param housekeepingBatchSize the size of the batch for housekeeping + */ + @Override public void setHousekeepingBatchSize(Integer housekeepingBatchSize) { this.housekeepingBatchSize = housekeepingBatchSize; } + /** - * @return the transactionBatchSize + * Retrieves the transaction batch size. + * + * @return The transaction batch size */ + @Override public Integer getTransactionBatchSize() { return transactionBatchSize; } + /** - * @param transactionBatchSize the transactionBatchSize to set + * Sets the transaction batch size. + * + * @param transactionBatchSize The size of the transaction batch. */ + @Override public void setTransactionBatchSize(Integer transactionBatchSize) { this.transactionBatchSize = transactionBatchSize; } - @Override + /** + * Sets the housekeep query to be used for deleting expired wiretaps. + * + * @param housekeepQuery the SQL query for deleting expired wiretaps + */ + @Override public void setHousekeepQuery(String housekeepQuery) { this.housekeepQuery = housekeepQuery; } + /** + * Sets whether the harvest query should be ordered. + * + * @param isHarvestQueryOrdered true if the harvest query should be ordered, false otherwise + */ @Override public void setHarvestQueryOrdered(boolean isHarvestQueryOrdered) { this.isHarvestQueryOrdered = isHarvestQueryOrdered; diff --git a/ikasaneip/wiretap/src/test/java/org/ikasan/WiretapTestAutoConfiguration.java b/ikasaneip/wiretap/src/test/java/org/ikasan/WiretapTestAutoConfiguration.java index 921d5dca75..5b9bd773e0 100644 --- a/ikasaneip/wiretap/src/test/java/org/ikasan/WiretapTestAutoConfiguration.java +++ b/ikasaneip/wiretap/src/test/java/org/ikasan/WiretapTestAutoConfiguration.java @@ -7,6 +7,8 @@ import org.ikasan.spec.configuration.PlatformConfigurationService; import org.ikasan.spec.dashboard.DashboardRestService; import org.ikasan.spec.module.ModuleService; +import org.ikasan.spec.wiretap.WiretapDao; +import org.ikasan.wiretap.dao.HibernateWiretapDao; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.context.annotation.Bean; @@ -44,6 +46,11 @@ public WiretapTestAutoConfiguration() { MockitoAnnotations.openMocks(this); } + @Bean + WiretapDao wiretapDaoDeleteOnceHarvested() { + return new HibernateWiretapDao(true, 1000, true); + } + @Bean(name = {"ikasan.xads", "ikasan.ds"}) public DataSource ikasanXaDataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); diff --git a/ikasaneip/wiretap/src/test/java/org/ikasan/wiretap/dao/HibernateWiretapDaoTest.java b/ikasaneip/wiretap/src/test/java/org/ikasan/wiretap/dao/HibernateWiretapDaoTest.java index 67bac5ae6f..b832f9614d 100644 --- a/ikasaneip/wiretap/src/test/java/org/ikasan/wiretap/dao/HibernateWiretapDaoTest.java +++ b/ikasaneip/wiretap/src/test/java/org/ikasan/wiretap/dao/HibernateWiretapDaoTest.java @@ -73,7 +73,11 @@ public class HibernateWiretapDaoTest { /** Object being tested */ - @Resource private WiretapDao wiretapDao; + @Resource + private WiretapDao wiretapDao; + + @Resource + private WiretapDao wiretapDaoDeleteOnceHarvested; @Before public void setup() { @@ -87,6 +91,7 @@ public void setup() { timestamp++; + event.setHarvested(true); this.wiretapDao.save(event); } @@ -101,6 +106,7 @@ public void setup() { timestamp++; + event.setHarvested(true); events.add(event); } @@ -195,7 +201,7 @@ public void test_housekeepables_exits() } @Test - public void test_housekeep() + public void test_housekeep_batch() { wiretapDao.setBatchHousekeepDelete(true); wiretapDao.deleteAllExpired(); @@ -204,11 +210,11 @@ public void test_housekeep() PagedSearchResult events = this.wiretapDao.findWiretapEvents(0, 1, null, false, null, flowNames, null, null, null, null, null, null); - Assert.assertEquals("Wiretap event result size == 1", events.getResultSize(), 8000); + Assert.assertEquals("Wiretap event result size == 8000", events.getResultSize(), 8000); } @Test - public void test_housekeep_batch() + public void test_housekeep() { wiretapDao.setBatchHousekeepDelete(false); wiretapDao.deleteAllExpired(); @@ -217,7 +223,33 @@ public void test_housekeep_batch() PagedSearchResult events = this.wiretapDao.findWiretapEvents(0, 1, null, false, null, flowNames, null, null, null, null, null, null); - Assert.assertEquals("Wiretap event result size == 1", events.getResultSize(), 0); + Assert.assertEquals("Wiretap event result size == 0", events.getResultSize(), 0); + } + + @Test + public void test_housekeep_delete_once_harvested() + { + this.wiretapDaoDeleteOnceHarvested.setBatchHousekeepDelete(false); + this.wiretapDaoDeleteOnceHarvested.deleteAllExpired(); + + HashSet flowNames = new HashSet<>(); + PagedSearchResult events = this.wiretapDao.findWiretapEvents(0, 1, null, false, null, + flowNames, null, null, null, null, null, null); + + Assert.assertEquals("Wiretap event result size == 0", events.getResultSize(), 0); + } + + @Test + public void test_housekeep_batch_delete_once_harvested() + { + this.wiretapDaoDeleteOnceHarvested.setBatchHousekeepDelete(true); + this.wiretapDaoDeleteOnceHarvested.deleteAllExpired(); + + HashSet flowNames = new HashSet<>(); + PagedSearchResult events = this.wiretapDaoDeleteOnceHarvested.findWiretapEvents(0, 1, null, false, null, + flowNames, null, null, null, null, null, null); + + Assert.assertEquals("Wiretap event result size == 1", events.getResultSize(), 8000); } @Test diff --git a/ikasaneip/wiretap/src/test/resources/logback-test.xml b/ikasaneip/wiretap/src/test/resources/logback-test.xml index 941a44f202..10b8307e39 100644 --- a/ikasaneip/wiretap/src/test/resources/logback-test.xml +++ b/ikasaneip/wiretap/src/test/resources/logback-test.xml @@ -12,7 +12,7 @@ - + From 2031f1729f42f604f8037cc9a0a8d7dba100d3ed Mon Sep 17 00:00:00 2001 From: Michael Stewart Date: Thu, 4 Apr 2024 18:31:24 +0100 Subject: [PATCH 2/2] IKASAN-2366 reverting change to the logger config. --- ikasaneip/wiretap/src/test/resources/logback-test.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ikasaneip/wiretap/src/test/resources/logback-test.xml b/ikasaneip/wiretap/src/test/resources/logback-test.xml index 10b8307e39..941a44f202 100644 --- a/ikasaneip/wiretap/src/test/resources/logback-test.xml +++ b/ikasaneip/wiretap/src/test/resources/logback-test.xml @@ -12,7 +12,7 @@ - +