Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ikasan 2366 add housekeeping upon harvesting feature #1287

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.ikasan.spec.housekeeping.HousekeepService;
import org.ikasan.spec.serialiser.SerialiserFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -28,6 +29,9 @@
@ImportResource("/error-reporting-transaction.xml")
public class ErrorReportingAutoConfiguration
{
@Value("${errorReportingHousekeepingJob-deleteOnceHarvested:false}")
private boolean deleteOnceHarvested;

@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public ErrorReportingService errorReportingService(ErrorReportingServiceFactory errorReportingServiceFactory) {
Expand All @@ -42,7 +46,7 @@ public ErrorReportingServiceFactory errorReportingServiceFactory(SerialiserFacto

@Bean(name = "errorReportingManagementService")
public ErrorReportingManagementService errorReportingManagementService(ErrorReportingServiceDao errorReportingServiceDao
, ErrorManagementDao errorManagementDao) {
, @Qualifier("errorManagementDao") ErrorManagementDao errorManagementDao) {
return new ErrorReportingManagementServiceImpl(errorManagementDao, errorReportingServiceDao);
}

Expand All @@ -51,9 +55,9 @@ public ErrorReportingServiceDao errorReportingServiceDao() {
return new HibernateErrorReportingServiceDao();
}

@Bean
@Bean(name = "errorManagementDao")
public ErrorManagementDao errorManagementDao() {
return new HibernateErrorManagementDao();
return new HibernateErrorManagementDao(this.deleteOnceHarvested);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class HibernateErrorManagementDao implements ErrorManagementDao
public static final String ERROR_OCCURRENCES_TO_DELETE_QUERY = "select uri from ErrorOccurrenceImpl eo " +
" where eo.expiry < :" + NOW;

public static final String HARVESTED_ERROR_OCCURRENCES_TO_DELETE_QUERY = "select uri from ErrorOccurrenceImpl eo " +
" where eo.harvested = true order by eo.timestamp";

public static final String ERROR_OCCURRENCE_DELETE_QUERY = "delete ErrorOccurrenceImpl eo " +
" where eo.uri in(:" + EVENT_IDS + ")";

Expand All @@ -84,7 +87,21 @@ public class HibernateErrorManagementDao implements ErrorManagementDao
@PersistenceContext(unitName = "error-reporting")
private EntityManager entityManager;

@Override
private boolean deleteOnceHarvested;

/**
* HibernateErrorManagementDao is a class that represents a data access object for managing errors in Hibernate.
* It provides methods for saving, deleting, and finding error occurrences, as well as housekeeping
* and updating harvested records.
*
* @param deleteOnceHarvested a boolean flag indicating whether harvested error occurrences should be deleted
* after being processed
*/
public HibernateErrorManagementDao(boolean deleteOnceHarvested) {
this.deleteOnceHarvested = deleteOnceHarvested;
}

@Override
public void saveErrorOccurrence(ErrorOccurrence errorOccurrence)
{
this.entityManager.persist(this.entityManager.contains(errorOccurrence) ? errorOccurrence : entityManager.merge(errorOccurrence));
Expand Down Expand Up @@ -243,8 +260,9 @@ public Long getNumberOfModuleErrors(String moduleName, boolean excluded, boolean

@Override
public void housekeep(final Integer numToHousekeep) {
Query query = this.entityManager.createQuery(ERROR_OCCURRENCES_TO_DELETE_QUERY);
query.setParameter(NOW, System.currentTimeMillis());
Query query = this.entityManager.createQuery(this.deleteOnceHarvested
? HARVESTED_ERROR_OCCURRENCES_TO_DELETE_QUERY : ERROR_OCCURRENCES_TO_DELETE_QUERY);
if(!this.deleteOnceHarvested)query.setParameter(NOW, System.currentTimeMillis());
query.setMaxResults(numToHousekeep);

List<Long> errorUris = (List<Long>)query.getResultList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
import com.arjuna.ats.jta.UserTransaction;
import jakarta.persistence.EntityManagerFactory;
import org.ikasan.error.reporting.dao.ErrorManagementDao;
import org.ikasan.error.reporting.dao.HibernateErrorManagementDao;
import org.ikasan.serialiser.converter.JmsMapMessageConverter;
import org.ikasan.serialiser.converter.JmsTextMessageConverter;
import org.ikasan.serialiser.service.SerialiserFactoryKryoImpl;
Expand Down Expand Up @@ -30,6 +32,11 @@
@ImportResource("/test-transaction.xml")
public class ErrorReportingTestAutoConfiguration
{
@Bean(name = "deleteOnceHarvestedErrorManagementDao")
public ErrorManagementDao deleteOnceHarvestedErrorManagementDao() {
return new HibernateErrorManagementDao(true);
}

@Bean(name = {"ikasan.xads", "ikasan.ds"})
public DataSource ikasanDataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class HibernateErrorManagementDaoTest
{
@Resource
ErrorManagementDao errorManagementDao;

@Resource
ErrorManagementDao deleteOnceHarvestedErrorManagementDao;

@Resource
ErrorReportingServiceDao errorReportingServiceDao;
Expand Down Expand Up @@ -125,9 +128,7 @@ public void test_count_error_occurrence_for_module()
this.errorReportingServiceDao.save(eo);
this.errorReportingServiceDao.save(eo1);
this.errorReportingServiceDao.save(eo2);

System.out.println(this.errorManagementDao.getNumberOfModuleErrors("moduleName", false, false, new Date(System.currentTimeMillis() - 100000000), new Date(System.currentTimeMillis() + 100000000)));


Assert.assertTrue(this.errorManagementDao.getNumberOfModuleErrors("moduleName", false, false, new Date(System.currentTimeMillis() - 100000000), new Date(System.currentTimeMillis() + 100000000)) == 3);
}

Expand All @@ -147,11 +148,50 @@ public void test_harvest_success()
errorOccurrences.add(eo);
}

Assert.assertEquals("Harvestable records == 1000", this.errorManagementDao.getHarvestableRecords(5000).size(), 1000);
Assert.assertEquals("Harvestable records == 1000", 1000, this.errorManagementDao.getHarvestableRecords(5000).size());

this.errorManagementDao.updateAsHarvested(errorOccurrences);

Assert.assertEquals("Harvestable records == 0", this.errorManagementDao.getHarvestableRecords(5000).size(), 0);
Assert.assertEquals("Harvestable records == 0", 0, this.errorManagementDao.getHarvestableRecords(5000).size());
}

@Test
@DirtiesContext
public void test_housekeep_success()
{
this.errorManagementDao.setHarvestQueryOrdered(true);

for(int i=0; i<1000; i++)
{
ErrorOccurrenceImpl eo = new ErrorOccurrenceImpl("moduleName", "flowName", "flowElementName", "errorDetail", "errorMessage", "exceptionClass", 100, new byte[100], "errorString");
this.errorReportingServiceDao.save(eo);
}

Assert.assertEquals("Harvestable records == 1000", this.errorManagementDao.getHarvestableRecords(5000).size(), 1000);

this.errorManagementDao.housekeep(100);

Assert.assertEquals("Harvestable records == 900", this.errorManagementDao.getHarvestableRecords(5000).size(), 900);
}

@Test
@DirtiesContext
public void test_housekeep_once_harvested_success()
{
this.deleteOnceHarvestedErrorManagementDao.setHarvestQueryOrdered(true);

for(int i=0; i<1000; i++)
{
ErrorOccurrenceImpl eo = new ErrorOccurrenceImpl("moduleName", "flowName", "flowElementName", "errorDetail", "errorMessage", "exceptionClass", 100000000L, new byte[100], "errorString");
eo.setHarvested(true);
this.errorReportingServiceDao.save(eo);
}

Assert.assertEquals("Harvestable records == 1000", this.errorManagementDao.getHarvestableRecords(5000).size(), 1000);

this.deleteOnceHarvestedErrorManagementDao.housekeep(100);

Assert.assertEquals("Harvestable records == 900", this.errorManagementDao.getHarvestableRecords(5000).size(), 900);
}

@Test
Expand Down
5 changes: 5 additions & 0 deletions ikasaneip/housekeeping/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Each and every one of the jobs can be tuned and configured by following set of p
- {jobName}-houseKeepingBatchSize defaults to 2500
- {jobName}-transactionBatchSize defaults to 200
- {jobName}-cronExpression defaults to '0 0/1 * * * ?'
- {jobName}-deleteOnceHarvested defaults to false (NB applicable for replayHousekeepingJob, wiretapHousekeepingJob, errorReportingHousekeepingJob, systemEventServiceHousekeepingJob)
- {jobName}-enabled defaults to true


Expand All @@ -27,24 +28,28 @@ Each and every one of the jobs can be tuned and configured by following set of p
replayHousekeepingJob-houseKeepingBatchSize=2500
replayHousekeepingJob-transactionBatchSize defaults=200
replayHousekeepingJob-cronExpression=0 0/1 * * * ?
replayHousekeepingJob-deleteOnceHarvested=false
replayHousekeepingJob-enabled=true

# Wiretap housekeeping settings
wiretapHousekeepingJob-houseKeepingBatchSize=2500
wiretapHousekeepingJob-transactionBatchSize defaults=200
wiretapHousekeepingJob-cronExpression=0 0/1 * * * ?
wiretapHousekeepingJob-deleteOnceHarvested=false
wiretapHousekeepingJob-enabled=true

# Error housekeeping settings
errorReportingHousekeepingJob-houseKeepingBatchSize=2500
errorReportingHousekeepingJob-transactionBatchSize defaults=200
errorReportingHousekeepingJob-cronExpression=0 0/1 * * * ?
errorReportingHousekeepingJob-deleteOnceHarvested=false
errorReportingHousekeepingJob-enabled=true

# SystemEvents housekeeping settings
systemEventServiceHousekeepingJob-houseKeepingBatchSize=2500
systemEventServiceHousekeepingJob-transactionBatchSize defaults=200
systemEventServiceHousekeepingJob-cronExpression=0 0/1 * * * ?
systemEventServiceHousekeepingJob-deleteOnceHarvested=false
systemEventServiceHousekeepingJob-enabled=true

# Duplicate Filter housekeeping settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.ikasan.spec.replay.*;
import org.ikasan.spec.serialiser.SerialiserFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.JpaVendorAdapter;
Expand All @@ -19,8 +20,11 @@
@Configuration
public class ReplayAutoConfiguration {

@Value("${replayHousekeepingJob-deleteOnceHarvested:false}")
private boolean deleteOnceHarvested;

@Bean(name = "replayManagementService")
public ReplayManagementService replayManagementService(ReplayDao replayDao, ReplayAuditDao replayAuditDao) {
public ReplayManagementService replayManagementService(@Qualifier("replayDao")ReplayDao replayDao, ReplayAuditDao replayAuditDao) {
return new ReplayManagementServiceImpl(replayDao, replayAuditDao);
}

Expand All @@ -30,13 +34,14 @@ public ReplayService replayService(ReplayAuditDao replayAuditDao) {
}

@Bean
public ReplayRecordService replayRecordService(ReplayDao replayDao, SerialiserFactory serialiserFactory) {
public ReplayRecordService replayRecordService(@Qualifier("replayDao")ReplayDao replayDao
, SerialiserFactory serialiserFactory) {
return new ReplayRecordServiceImpl(serialiserFactory, replayDao);
}

@Bean
@Bean(name = "replayDao")
public ReplayDao replayDao() {
return new HibernateReplayDao();
return new HibernateReplayDao(this.deleteOnceHarvested);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class HibernateReplayDao implements ReplayDao<Long>,ReplayAuditDao<Replay
public static final String REPLAY_EVENTS_TO_DELETE_QUERY = "select id from ReplayEventImpl re " +
" where re.expiry < :" + NOW;

public static final String HARVESTED_REPLAY_EVENTS_TO_DELETE_QUERY = "select id from ReplayEventImpl re " +
" where re.harvested=true order by re.timestamp ASC";
public static final String REPLAY_EVENTS_DELETE_QUERY = "delete ReplayEventImpl re " +
" where re.id in(:" + EVENT_IDS + ")";

Expand All @@ -111,14 +113,19 @@ where re.id not in(select distinct rae.id.replayAuditId from ReplayAuditEventImp
" where w.id in(:" + EVENT_IDS + ")";

private boolean isHarvestQueryOrdered = false;
private boolean deleteOnceHarvested;

@PersistenceContext(unitName = "xa-replay")
private EntityManager entityManager;


public HibernateReplayDao()
{

/**
* Constructs a new instance of the {@link HibernateReplayDao} class with the specified deleteOnceHarvested flag.
*
* @param deleteOnceHarvested a boolean value indicating whether replay events should be deleted once harvested
*/
public HibernateReplayDao(boolean deleteOnceHarvested) {
this.deleteOnceHarvested = deleteOnceHarvested;
}

/* (non-Javadoc)
Expand Down Expand Up @@ -305,7 +312,6 @@ public List<ReplayEvent> getReplayEvents(List<String> moduleNames,

if (payloadContent != null && payloadContent.length() > 0)
{
//criteria.add(Restrictions.like("eventAsString", payloadContent, MatchMode.ANYWHERE));
predicates.add( builder.like(root.get("eventAsString"),payloadContent));
}

Expand Down Expand Up @@ -407,8 +413,9 @@ public Long getNumberReplayAuditEventsByAuditId(final Long id)
@Override
public void housekeep(final Integer numToHousekeep)
{
Query query = this.entityManager.createQuery(REPLAY_EVENTS_TO_DELETE_QUERY);
query.setParameter(NOW, System.currentTimeMillis());
Query query = this.entityManager.createQuery(this.deleteOnceHarvested ?
HARVESTED_REPLAY_EVENTS_TO_DELETE_QUERY : REPLAY_EVENTS_TO_DELETE_QUERY);
if(!this.deleteOnceHarvested)query.setParameter(NOW, System.currentTimeMillis());
query.setMaxResults(numToHousekeep);

List<Long> replayEventIds = query.getResultList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
import com.arjuna.ats.jta.UserTransaction;
import jakarta.jms.MapMessage;
import jakarta.jms.TextMessage;
import org.ikasan.replay.dao.HibernateReplayDao;
import org.ikasan.replay.service.ReplayManagementServiceImpl;
import org.ikasan.serialiser.converter.JmsMapMessageConverter;
import org.ikasan.serialiser.converter.JmsTextMessageConverter;
import org.ikasan.serialiser.service.SerialiserFactoryKryoImpl;
import org.ikasan.spec.module.ModuleContainer;
import org.ikasan.spec.replay.ReplayAuditDao;
import org.ikasan.spec.replay.ReplayDao;
import org.ikasan.spec.replay.ReplayManagementService;
import org.ikasan.spec.serialiser.Serialiser;
import org.ikasan.spec.serialiser.SerialiserFactory;
import org.jmock.Mockery;
import org.jmock.imposters.ByteBuddyClassImposteriser;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
Expand Down Expand Up @@ -56,6 +62,17 @@ public ModuleContainer moduleContainer() {
return mockery.mock(ModuleContainer.class);
}

@Bean(name = "deleteOnceHarvestedReplayManagementService")
public ReplayManagementService deleteOnceHarvestedReplayManagementService(@Qualifier("deleteOnceHarvestedReplayDao")ReplayDao replayDao
, ReplayAuditDao replayAuditDao) {
return new ReplayManagementServiceImpl(replayDao, replayAuditDao);
}

@Bean(name = "deleteOnceHarvestedReplayDao")
public ReplayDao deleteOnceHarvestedReplayDao() {
return new HibernateReplayDao(true);
}

@Bean(name = {"ikasan.xads", "ikasan.ds"})
public DataSource ikasanDataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
Expand Down
Loading