Skip to content

Commit

Permalink
Optimalisering av polling for postgres (#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
jolarsen authored Jun 17, 2024
1 parent 24e9934 commit fa70844
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ public class TaskManagerRepositoryImpl {
private static final String JAVAX_PERSISTENCE_CACHE_STORE_MODE = "jakarta.persistence.cache.storeMode";
private static final String REFRESH = "REFRESH";

private static String SQL_FRA_FIL;

private static String POSTGRESQL_FRA_FIL;

private final String jvmUniqueProcessName = getJvmUniqueProcessName();
private final String sqlFraFil = getSqlFraFil(TaskManager.class.getSimpleName() + "_pollTask.sql");
private String sqlFraFil;

private final EntityManager entityManager;

Expand All @@ -80,9 +84,26 @@ String getSqlForPolling() {
}

String getSqlForPollingTemplate() {
if (sqlFraFil == null) {
sqlFraFil = getSqlForPollingTemplate(entityManager);
}
return sqlFraFil;
}

private static synchronized String getSqlForPollingTemplate(EntityManager entityManager) {
if (DatabaseUtil.isPostgres(entityManager)) {
if (POSTGRESQL_FRA_FIL == null) {
POSTGRESQL_FRA_FIL = getSqlFraFil(TaskManager.class.getSimpleName() + "postgres_pollTask.sql");
}
return POSTGRESQL_FRA_FIL;
} else {
if (SQL_FRA_FIL == null) {
SQL_FRA_FIL = getSqlFraFil(TaskManager.class.getSimpleName() + "_pollTask.sql");
}
return SQL_FRA_FIL;
}
}

static String getSqlFraFil(String filNavn) {
try (var is = TaskManager.class.getResourceAsStream(filNavn);
var s = is == null ? null : new Scanner(is, StandardCharsets.UTF_8)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Her ligger all logikk for å fordele tasker på tvers av flere pollere,
* plukke tasker avhengig av om der klare for å kjøres (status + neste_kjoering_etter tid er passert), og
* besørge at tasker kjøres i rekkefølge de var ment (sekvensielt eller parallellt).
* <p>
* SELECT'en er basert på to ting som er verdt å forstå når man leser denne.
* <ul>
* <li>SELECT FOR UPDATE SKIP LOCKED: Benyttes til å spre tasker på tvers av flere pollere slik at de ikke går i
* beina på hverandre. Gir dermed mulighet for økt skalerbarhet og fleksible kjøring</li>
* <li>Non-ANSI optimalisering for Postgres: Bruker DISTINCT ON for å finne første sekvensnummer i en gruppe, i common table expression (with)
* Når det er skrevet slik, velger postgresql å kjøre det aggregeringen i CTE først, som gjør at spørringen er raskere
* (spesielt når grupper har mange sekvensnummer). </li>
* </ul>
*/
WITH foerste_prosesstasker AS (
SELECT DISTINCT ON (task_gruppe) task_gruppe, task_sekvens
FROM prosess_task
-- bruker dette istdf. (status NOT IN('FERDIG', 'KJOERT')). Innført for å bruke partisjoner med minst data, unngår skanning av alle partisjoner
WHERE status IN ('FEILET', 'KLAR', 'VENTER_SVAR', 'SUSPENDERT', 'VETO')
ORDER BY task_gruppe, length(task_sekvens), task_sekvens)
SELECT pt.*
FROM prosess_task pt
join foerste_prosesstasker fp on fp.task_gruppe = pt.task_gruppe and fp.task_sekvens = pt.task_sekvens
WHERE pt.status = 'KLAR'
-- fjerner de som har mindre enn maks antall feilede forsøk
-- håndterer at kjøring ikke skjer før angitt tidstempel
AND (pt.neste_kjoering_etter IS NULL OR pt.neste_kjoering_etter < :neste_kjoering)
AND pt.id NOT IN (:skip_ids) -- sjekk mot skip ids i ytre loop ellers paavirkes rekkefølge
-- sorter etter prioritet og når de sist ble kjørt
ORDER BY prioritet DESC, siste_kjoering_ts ASC NULLS FIRST, ID ASC
FOR UPDATE SKIP LOCKED

0 comments on commit fa70844

Please sign in to comment.