Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimalisering av polling for postgres #634

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ public class TaskManagerRepositoryImpl {
private static final String JAVAX_PERSISTENCE_CACHE_STORE_MODE = "jakarta.persistence.cache.storeMode";
private static final String REFRESH = "REFRESH";

private static String SQL_FRA_FIL;

private static String POSTGRESQL_FRA_FIL;

private final String jvmUniqueProcessName = getJvmUniqueProcessName();
private final String sqlFraFil = getSqlFraFil(TaskManager.class.getSimpleName() + "_pollTask.sql");
private String sqlFraFil;

private final EntityManager entityManager;

Expand All @@ -80,9 +84,26 @@ String getSqlForPolling() {
}

String getSqlForPollingTemplate() {
if (sqlFraFil == null) {
sqlFraFil = getSqlForPollingTemplate(entityManager);
}
return sqlFraFil;
}

private static synchronized String getSqlForPollingTemplate(EntityManager entityManager) {
if (DatabaseUtil.isPostgres(entityManager)) {
if (POSTGRESQL_FRA_FIL == null) {
POSTGRESQL_FRA_FIL = getSqlFraFil(TaskManager.class.getSimpleName() + "postgres_pollTask.sql");
}
return POSTGRESQL_FRA_FIL;
} else {
if (SQL_FRA_FIL == null) {
SQL_FRA_FIL = getSqlFraFil(TaskManager.class.getSimpleName() + "_pollTask.sql");
}
return SQL_FRA_FIL;
}
}

static String getSqlFraFil(String filNavn) {
try (var is = TaskManager.class.getResourceAsStream(filNavn);
var s = is == null ? null : new Scanner(is, StandardCharsets.UTF_8)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Her ligger all logikk for å fordele tasker på tvers av flere pollere,
* plukke tasker avhengig av om der klare for å kjøres (status + neste_kjoering_etter tid er passert), og
* besørge at tasker kjøres i rekkefølge de var ment (sekvensielt eller parallellt).
* <p>
* SELECT'en er basert på to ting som er verdt å forstå når man leser denne.
* <ul>
* <li>SELECT FOR UPDATE SKIP LOCKED: Benyttes til å spre tasker på tvers av flere pollere slik at de ikke går i
* beina på hverandre. Gir dermed mulighet for økt skalerbarhet og fleksible kjøring</li>
* <li>Non-ANSI optimalisering for Postgres: Bruker DISTINCT ON for å finne første sekvensnummer i en gruppe, i common table expression (with)
* Når det er skrevet slik, velger postgresql å kjøre det aggregeringen i CTE først, som gjør at spørringen er raskere
* (spesielt når grupper har mange sekvensnummer). </li>
* </ul>
*/
WITH foerste_prosesstasker AS (
SELECT DISTINCT ON (task_gruppe) task_gruppe, task_sekvens
FROM prosess_task
-- bruker dette istdf. (status NOT IN('FERDIG', 'KJOERT')). Innført for å bruke partisjoner med minst data, unngår skanning av alle partisjoner
WHERE status IN ('FEILET', 'KLAR', 'VENTER_SVAR', 'SUSPENDERT', 'VETO')
ORDER BY task_gruppe, length(task_sekvens), task_sekvens)
SELECT pt.*
FROM prosess_task pt
join foerste_prosesstasker fp on fp.task_gruppe = pt.task_gruppe and fp.task_sekvens = pt.task_sekvens
WHERE pt.status = 'KLAR'
-- fjerner de som har mindre enn maks antall feilede forsøk
-- håndterer at kjøring ikke skjer før angitt tidstempel
AND (pt.neste_kjoering_etter IS NULL OR pt.neste_kjoering_etter < :neste_kjoering)
AND pt.id NOT IN (:skip_ids) -- sjekk mot skip ids i ytre loop ellers paavirkes rekkefølge
-- sorter etter prioritet og når de sist ble kjørt
ORDER BY prioritet DESC, siste_kjoering_ts ASC NULLS FIRST, ID ASC
FOR UPDATE SKIP LOCKED