Skip to content

Commit

Permalink
fix: flytter KJOERT->FERDIG i egen transaksjon. Trenger ikke kjørs of…
Browse files Browse the repository at this point in the history
…te - default blir nå hvert 5min per node. dvs. 1x per min hvis det er 5 noder (#34)
  • Loading branch information
frode-carlsen authored Apr 18, 2020
1 parent 672f8f6 commit 4c7cef7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class TaskManager implements AppServiceHandler {
/**
* Future for å kunne kansellere polling.
*/
private ScheduledFuture<?> pollingServiceScheduledFuture;
private List<ScheduledFuture<?>> pollingServiceScheduledFutures;

/**
* Implementasjon av dispatcher for å faktisk kjøre tasks.
Expand Down Expand Up @@ -120,16 +120,17 @@ private static ProsessTaskDispatcher selectProsessTaskDispatcher(Instance<Proses
return dispatcher.get();
} else if (dispatcher.isAmbiguous()) {
List<ProsessTaskDispatcher> dispatcherList = new ArrayList<>();
for(var disp : dispatcher) {
if(!(disp instanceof DefaultProsessTaskDispatcher)) {
for (var disp : dispatcher) {
if (!(disp instanceof DefaultProsessTaskDispatcher)) {
dispatcherList.add(disp);
}
}
if(dispatcherList.size() == 1) {
if (dispatcherList.size() == 1) {
return dispatcherList.get(0);
} else {
// kast exception har fler enn 2 instanser tilgjengelig, vet ikke hvilken vi skal velge
throw new IllegalArgumentException("Utvikler-feil: har flere mulige instanser å velge mellom, vet ikke hvilken som skal benyttes: "+ dispatcherList);
throw new IllegalArgumentException(
"Utvikler-feil: har flere mulige instanser å velge mellom, vet ikke hvilken som skal benyttes: " + dispatcherList);
}
} else {
throw new IllegalArgumentException("Utvikler-feil: skal ikke komme hit (unsatifisied dependency) - har ingen ProsessTaskDispatcher");
Expand Down Expand Up @@ -175,9 +176,9 @@ public synchronized void start() {

@Override
public synchronized void stop() {
if (pollingServiceScheduledFuture != null) {
pollingServiceScheduledFuture.cancel(true);
pollingServiceScheduledFuture = null;
if (pollingServiceScheduledFutures != null) {
pollingServiceScheduledFutures.forEach(s -> s.cancel(true));
pollingServiceScheduledFutures = null;
}
if (runTaskService != null) {
runTaskService.stop();
Expand All @@ -186,15 +187,16 @@ public synchronized void stop() {
}

synchronized void startPollerThread() {
if (pollingServiceScheduledFuture != null) {
if (pollingServiceScheduledFutures != null) {
throw new IllegalStateException("Service allerede startet, stopp først");//$NON-NLS-1$
}
if (pollingService == null) {
this.pollingService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadPoolNamePrefix + "-poller", false)); //$NON-NLS-1$
}
this.pollingServiceScheduledFuture = pollingService.scheduleWithFixedDelay(new PollAvailableTasks(), delayBetweenPollingMillis / 2,
delayBetweenPollingMillis, TimeUnit.MILLISECONDS); // NOSONAR
this.pollingServiceScheduledFutures = List.of(
pollingService.scheduleWithFixedDelay(new PollAvailableTasks(), delayBetweenPollingMillis / 2, delayBetweenPollingMillis, TimeUnit.MILLISECONDS),
pollingService.scheduleWithFixedDelay(new MoveToDonePartition(), 30 * 1000L, 5 * 60 * 1000L, TimeUnit.MILLISECONDS));
}

synchronized void startTaskThreads() {
Expand All @@ -211,8 +213,6 @@ interface ReadTaskFunksjon extends Function<ProsessTaskEntitet, IdentRunnable> {
* Poller for tasks og logger jevnlig om det ikke er ledig kapasitet (i in-memory queue) eller ingen tasks funnet (i db).
*/
protected synchronized List<IdentRunnable> pollForAvailableTasks() {

taskManagerRepository.flyttAlleKjoertTilFerdig();

LocalDateTime now = LocalDateTime.now();

Expand Down Expand Up @@ -264,7 +264,7 @@ List<IdentRunnable> pollTasksFunksjon(int numberOfTasksToPoll, ReadTaskFunksjon
int numberOfTasksStillToGo = numberOfTasksToPoll;

Set<Long> inmemoryTaskIds = getRunTaskService().getTaskIds();

List<ProsessTaskEntitet> tasksEntiteter = taskManagerRepository
.pollNesteScrollingUpdate(numberOfTasksStillToGo, waitTimeBeforeNextPollingAttemptSecs, inmemoryTaskIds);

Expand Down Expand Up @@ -421,6 +421,32 @@ private static long getSystemPropertyWithLowerBoundry(String key, long defaultVa
return systemPropertyValue;
}

/** Flytter fra KJOERT til FERDIG status i en separat tråd/transaksjon. */
class MoveToDonePartition extends TransactionHandler<Void> implements Runnable {

@Override
public void run() {
try {
EntityManager entityManager = getTransactionManagerRepository().getEntityManager();
try {
super.apply(entityManager);
} finally {
CDI.current().destroy(entityManager);
}
} catch (Throwable t) { // NOSONAR
// logg, ikke rethrow feil her da det dreper trådene
log.error("Kunne ikke flytte KJOERT tasks til FERDIG partisjoner", t);
}
}

@Override
protected Void doWork(EntityManager entityManager) throws Exception {
getTransactionManagerRepository().moveToDonePartition();
return null;
}

}

/** Internal executor that also tracks ids of currently queue or running tasks. */
class IdentExecutorService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void oppdaterTaskUnderArbeid(Long prosessTaskId, LocalDateTime now) {
}

/** Markere tasks om er kjørt FERDIG. Dette har som konsekvens at det flytter tasks fra default partisjon til FERDIG partisjoner. */
void flyttAlleKjoertTilFerdig() {
void moveToDonePartition() {
String updateSql = "update PROSESS_TASK set status = 'FERDIG' WHERE status='KJOERT'";

@SuppressWarnings("unused")
Expand Down

0 comments on commit 4c7cef7

Please sign in to comment.