Skip to content

Commit

Permalink
Merge WorkflowStateCleanupTask into HouseKeepingTask
Browse files Browse the repository at this point in the history
Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Aug 7, 2024
1 parent 74abdb2 commit b15bd91
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 509 deletions.
3 changes: 0 additions & 3 deletions src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public enum ConfigKey implements Config.Key {
CRON_EXPRESSION_FOR_FORTIFY_SSC_SYNC("task.cron.fortify.ssc.sync", "0 2 * * *"),
CRON_EXPRESSION_FOR_DEFECT_DOJO_SYNC("task.cron.defectdojo.sync", "0 2 * * *"),
CRON_EXPRESSION_FOR_KENNA_SYNC("task.cron.kenna.sync", "0 2 * * *"),
CRON_EXPRESSION_FOR_WORKFLOW_STATE_CLEANUP_TASK("task.cron.workflow.state.cleanup", "*/15 * * * *"),
CRON_EXPRESSION_FOR_INTEGRITY_META_INITIALIZER_TASK("task.cron.integrityInitializer", "0 */12 * * *"),
CRON_EXPRESSION_FOR_HOUSEKEEPING_TASK("task.cron.housekeeping", "45 * * * *"),
TASK_SCHEDULER_INITIAL_DELAY("task.scheduler.initial.delay", "180000"),
Expand All @@ -68,8 +67,6 @@ public enum ConfigKey implements Config.Key {
TASK_COMPONENT_IDENTIFICATION_LOCK_AT_LEAST_FOR("task.componentIdentification.lockAtLeastForInMillis", "90000"),
TASK_LDAP_SYNC_LOCK_AT_MOST_FOR("task.ldapSync.lockAtMostForInMillis", "900000"),
TASK_LDAP_SYNC_LOCK_AT_LEAST_FOR("task.ldapSync.lockAtLeastForInMillis", "90000"),
TASK_WORKFLOW_STEP_CLEANUP_LOCK_AT_MOST_FOR("task.workflow.state.cleanup.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_WORKFLOW_STEP_CLEANUP_LOCK_AT_LEAST_FOR("task.workflow.state.cleanup.lockAtLeastForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_PORTFOLIO_REPO_META_ANALYSIS_LOCK_AT_MOST_FOR("task.portfolio.repoMetaAnalysis.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_PORTFOLIO_REPO_META_ANALYSIS_LOCK_AT_LEAST_FOR("task.portfolio.repoMetaAnalysis.lockAtLeastForInMillis", String.valueOf(Duration.ofMinutes(5).toMillis())),
TASK_PORTFOLIO_VULN_ANALYSIS_LOCK_AT_MOST_FOR("task.portfolio.vulnAnalysis.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.dependencytrack.tasks.TaskScheduler;
import org.dependencytrack.tasks.VexUploadProcessingTask;
import org.dependencytrack.tasks.VulnerabilityAnalysisTask;
import org.dependencytrack.tasks.WorkflowStateCleanupTask;
import org.dependencytrack.tasks.metrics.PortfolioMetricsUpdateTask;
import org.dependencytrack.tasks.metrics.ProjectMetricsUpdateTask;
import org.dependencytrack.tasks.metrics.VulnerabilityMetricsUpdateTask;
Expand Down Expand Up @@ -96,7 +95,6 @@ public void contextInitialized(final ServletContextEvent event) {
EVENT_SERVICE.subscribe(EpssMirrorEvent.class, EpssMirrorTask.class);
EVENT_SERVICE.subscribe(ComponentPolicyEvaluationEvent.class, PolicyEvaluationTask.class);
EVENT_SERVICE.subscribe(ProjectPolicyEvaluationEvent.class, PolicyEvaluationTask.class);
EVENT_SERVICE.subscribe(WorkflowStateCleanupEvent.class, WorkflowStateCleanupTask.class);
EVENT_SERVICE.subscribe(IntegrityMetaInitializerEvent.class, IntegrityMetaInitializerTask.class);
EVENT_SERVICE.subscribe(IntegrityAnalysisEvent.class, IntegrityAnalysisTask.class);
EVENT_SERVICE.subscribe(HouseKeepingEvent.class, HouseKeepingTask.class);
Expand Down Expand Up @@ -130,7 +128,6 @@ public void contextDestroyed(final ServletContextEvent event) {
EVENT_SERVICE.unsubscribe(NistMirrorTask.class);
EVENT_SERVICE.unsubscribe(EpssMirrorTask.class);
EVENT_SERVICE.unsubscribe(PolicyEvaluationTask.class);
EVENT_SERVICE.unsubscribe(WorkflowStateCleanupTask.class);
EVENT_SERVICE.unsubscribe(IntegrityMetaInitializerTask.class);
EVENT_SERVICE.unsubscribe(IntegrityAnalysisTask.class);
EVENT_SERVICE.unsubscribe(VulnerabilityPolicyFetchTask.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1854,10 +1854,6 @@ public void updateWorkflowStateToFailed(WorkflowState workflowState, String fail
getWorkflowStateQueryManager().updateWorkflowStateToFailed(workflowState, failureReason);
}

public boolean hasWorkflowStepWithStatus(final UUID token, final WorkflowStep step, final WorkflowStatus status) {
return getWorkflowStateQueryManager().hasWorkflowStepWithStatus(token, step, status);
}

public IntegrityMetaComponent getIntegrityMetaComponent(String purl) {
return getIntegrityMetaQueryManager().getIntegrityMetaComponent(purl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class WorkflowStateQueryManager extends QueryManager implements IQueryManager {
Expand Down Expand Up @@ -329,20 +328,4 @@ public void createWorkflowSteps(UUID token) {
}
}

public boolean hasWorkflowStepWithStatus(final UUID token, final WorkflowStep step, final WorkflowStatus status) {
final Query<WorkflowState> stateQuery = pm.newQuery(WorkflowState.class);
stateQuery.setFilter("token == :token && step == :step && status == :status");
stateQuery.setNamedParameters(Map.of(
"token", token,
"step", step,
"status", status
));
stateQuery.setResult("count(this)");
try {
return stateQuery.executeResultUnique(Long.class) > 0;
} finally {
stateQuery.closeAll();
}
}

}
110 changes: 97 additions & 13 deletions src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@
import org.dependencytrack.model.WorkflowState;
import org.dependencytrack.model.WorkflowStatus;
import org.dependencytrack.model.WorkflowStep;
import org.jdbi.v3.sqlobject.SqlObject;
import org.jdbi.v3.sqlobject.config.RegisterBeanMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface WorkflowDao {
public interface WorkflowDao extends SqlObject {

@SqlBatch("""
UPDATE "WORKFLOW_STATE"
Expand All @@ -46,15 +49,19 @@ public interface WorkflowDao {
""")
@GetGeneratedKeys("*")
@RegisterBeanMapper(WorkflowState.class)
List<WorkflowState> updateAllStates(@Bind WorkflowStep step,
@Bind("token") List<String> tokens,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons);
List<WorkflowState> updateAllStates(
@Bind WorkflowStep step,
@Bind("token") List<String> tokens,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons
);

default Optional<WorkflowState> updateState(final WorkflowStep step,
final String token,
final WorkflowStatus status,
final String failureReason) {
default Optional<WorkflowState> updateState(
final WorkflowStep step,
final String token,
final WorkflowStatus status,
final String failureReason
) {
final List<WorkflowState> updatedStates = updateAllStates(step, List.of(token), List.of(status), Collections.singletonList(failureReason));
if (updatedStates.isEmpty()) {
return Optional.empty();
Expand All @@ -70,9 +77,11 @@ default Optional<WorkflowState> updateState(final WorkflowStep step,
AND "STATUS" = :status
AND "TOKEN" = ANY(:tokens)
""")
Set<String> getTokensByStepAndStateAndTokenAnyOf(@Bind WorkflowStep step,
@Bind WorkflowStatus status,
@Bind Collection<String> tokens);
Set<String> getTokensByStepAndStateAndTokenAnyOf(
@Bind WorkflowStep step,
@Bind WorkflowStatus status,
@Bind Collection<String> tokens
);

@SqlBatch("""
WITH RECURSIVE
Expand All @@ -95,8 +104,83 @@ Set<String> getTokensByStepAndStateAndTokenAnyOf(@Bind WorkflowStep step,
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'CANCELLED'
, "UPDATED_AT" = NOW()
WHERE "ID" IN (SELECT "ID" FROM "CTE_CHILDREN")
WHERE "ID" = ANY(SELECT "ID" FROM "CTE_CHILDREN")
""")
void cancelAllChildren(@Bind WorkflowStep step, @Bind("token") List<String> tokens);

/**
* @since 5.6.0
*/
@SqlBatch("""
WITH RECURSIVE
"CTE_CHILDREN" ("ID") AS (
SELECT "ID"
FROM "WORKFLOW_STATE"
WHERE "PARENT_STEP_ID" = :parentId
UNION ALL
SELECT "CHILD"."ID"
FROM "WORKFLOW_STATE" AS "CHILD"
INNER JOIN "CTE_CHILDREN" AS "PARENT"
ON "PARENT"."ID" = "CHILD"."PARENT_STEP_ID"
)
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'CANCELLED'
, "UPDATED_AT" = NOW()
WHERE "ID" = ANY(SELECT "ID" FROM "CTE_CHILDREN")
""")
int[] cancelAllChildrenByParentStepIdAnyOf(@Bind("parentId") List<Long> parentIds);

/**
* @since 5.6.0
*/
@SqlUpdate("""
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'TIMED_OUT'
, "UPDATED_AT" = NOW()
WHERE "STATUS" = 'PENDING'
AND AGE(NOW(), "UPDATED_AT") >= :timeoutDuration
""")
int transitionAllPendingStepsToTimedOutForTimeout(@Bind Duration timeoutDuration);

/**
* @since 5.6.0
*/
default List<Long> transitionAllTimedOutStepsToFailedForTimeout(final Duration timeoutDuration) {
// NB: Can't use interface method here due to https://github.com/jdbi/jdbi/issues/1807.
return getHandle().createUpdate("""
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'FAILED'
, "FAILURE_REASON" = 'Timed out'
, "UPDATED_AT" = NOW()
WHERE "STATUS" = 'TIMED_OUT'
AND AGE(NOW(), "UPDATED_AT") >= :timeoutDuration
RETURNING "ID"
""")
.bind("timeoutDuration", timeoutDuration)
.executeAndReturnGeneratedKeys()
.mapTo(Long.class)
.list();
}

/**
* @since 5.6.0
*/
@SqlUpdate("""
WITH "CTE_ELIGIBLE_TOKENS" AS (
SELECT "TOKEN"
FROM "WORKFLOW_STATE" AS "WFS_PARENT"
WHERE NOT EXISTS(
SELECT 1
FROM "WORKFLOW_STATE" AS "WFS"
WHERE "WFS"."TOKEN" = "WFS_PARENT"."TOKEN"
AND "WFS"."STATUS" IN ('PENDING', 'TIMED_OUT'))
GROUP BY "TOKEN"
HAVING AGE(NOW(), MAX("UPDATED_AT")) >= :retentionDuration
)
DELETE
FROM "WORKFLOW_STATE"
WHERE "TOKEN" = ANY(SELECT "TOKEN" FROM "CTE_ELIGIBLE_TOKENS")
""")
int deleteAllForRetention(@Bind Duration retentionDuration);

}
87 changes: 73 additions & 14 deletions src/main/java/org/dependencytrack/tasks/HouseKeepingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@
import alpine.event.framework.Event;
import alpine.event.framework.Subscriber;
import org.dependencytrack.event.HouseKeepingEvent;
import org.dependencytrack.model.WorkflowStatus;
import org.dependencytrack.persistence.jdbi.VulnerabilityScanDao;
import org.dependencytrack.persistence.jdbi.WorkflowDao;
import org.dependencytrack.plugin.PluginManager;
import org.dependencytrack.storage.BomUploadStorage;
import org.dependencytrack.util.LockProvider;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import static org.dependencytrack.common.ConfigKey.BOM_UPLOAD_STORAGE_RETENTION_DURATION;
import static org.dependencytrack.common.ConfigKey.WORKFLOW_RETENTION_DURATION;
import static org.dependencytrack.common.ConfigKey.WORKFLOW_STEP_TIMEOUT_DURATION;
import static org.dependencytrack.persistence.jdbi.JdbiFactory.inJdbiTransaction;
import static org.dependencytrack.persistence.jdbi.JdbiFactory.useJdbiHandle;
import static org.dependencytrack.tasks.LockName.HOUSEKEEPING_TASK_LOCK;

/**
Expand Down Expand Up @@ -61,53 +68,105 @@ public void inform(final Event event) {
return;
}

LOGGER.info("Starting housekeeping activities");
final long startTimeNs = System.nanoTime();

try {
LockProvider.executeWithLock(HOUSEKEEPING_TASK_LOCK, (Runnable) this::informLocked);

final var taskDuration = Duration.ofNanos(System.nanoTime() - startTimeNs);
LOGGER.info("Housekeeping completed in %s".formatted(taskDuration));
} catch (Throwable t) {
LOGGER.error("Failed to complete housekeeping activities", t);
final var taskDuration = Duration.ofNanos(System.nanoTime() - startTimeNs);
LOGGER.error("Housekeeping failed to complete after %s".formatted(taskDuration), t);
}
}

private void informLocked() {
try {
enforceBomUploadRetention();
performBomUploadHouseKeeping();
} catch (IOException | RuntimeException e) {
LOGGER.error("Failed to enforce BOM upload retention", e);
LOGGER.error("Failed perform housekeeping of BOM uploads", e);
}

try {
performVulnerabilityScanHouseKeeping();
} catch (RuntimeException e) {
LOGGER.error("Failed to perform housekeeping of vulnerability scans", e);
}

try {
enforceVulnerabilityScanRetention();
performWorkflowHouseKeeping();
} catch (RuntimeException e) {
LOGGER.error("Failed to enforce vulnerability scan retention", e);
LOGGER.error("Failed to perform housekeeping of workflows", e);
}

// TODO: Enforce retention for metrics?
// TODO: Remove RepositoryMetaComponent records for which no matching Component exists anymore?
// TODO: Remove IntegrityMetaComponent records for which no matching Component exists anymore?
// TODO: Move WorkflowStateCleanupTask here.
// TODO: Remove VulnerableSoftware records that are no longer associated with any vulnerability?
}

private void enforceBomUploadRetention() throws IOException {
private void performBomUploadHouseKeeping() throws IOException {
final Duration retentionDuration = Duration.parse(config.getProperty(BOM_UPLOAD_STORAGE_RETENTION_DURATION));
LOGGER.info("Deleting uploaded BOMs older than %s from storage".formatted(retentionDuration));

try (final var storage = pluginManager.getExtension(BomUploadStorage.class)) {
final int bomsDeleted = storage.deleteBomsForRetentionDuration(retentionDuration);
LOGGER.info("Deleted %s BOMs for retention duration %s"
.formatted(bomsDeleted == 0 ? "no" : bomsDeleted, retentionDuration));
if (bomsDeleted > 0) {
LOGGER.warn("Deleted %s BOM(s) for retention duration %s"
.formatted(bomsDeleted, retentionDuration));
}
}
}

private void enforceVulnerabilityScanRetention() {
private void performVulnerabilityScanHouseKeeping() {
final Duration retentionDuration = Duration.ofDays(1); // TODO: Make configurable?
LOGGER.info("Deleting vulnerability scans older than %s".formatted(retentionDuration));

final int scansDeleted = inJdbiTransaction(handle -> {
final var dao = handle.attach(VulnerabilityScanDao.class);
return dao.deleteAllForRetentionDuration(retentionDuration);
});
LOGGER.info("Deleted %s vulnerability scans for retention duration %s"
.formatted(scansDeleted == 0 ? "no" : scansDeleted, retentionDuration));
if (scansDeleted > 0) {
LOGGER.info("Deleted %s vulnerability scan(s) for retention duration %s"
.formatted(scansDeleted, retentionDuration));
}
}

private void performWorkflowHouseKeeping() {
final Duration timeoutDuration = Duration.parse(config.getProperty(WORKFLOW_STEP_TIMEOUT_DURATION));
final Duration retentionDuration = Duration.parse(config.getProperty(WORKFLOW_RETENTION_DURATION));

useJdbiHandle(handle -> {
final var dao = handle.attach(WorkflowDao.class);

final int numTimedOut = dao.transitionAllPendingStepsToTimedOutForTimeout(timeoutDuration);
if (numTimedOut > 0) {
LOGGER.warn("Transitioned %d workflow step(s) from %s to %s for timeout %s"
.formatted(numTimedOut, WorkflowStatus.PENDING, WorkflowStatus.TIMED_OUT, timeoutDuration));
}

handle.useTransaction(ignored -> {
final List<Long> failedStepIds = dao.transitionAllTimedOutStepsToFailedForTimeout(timeoutDuration);
if (failedStepIds.isEmpty()) {
return;
}

LOGGER.warn("Transitioned %d workflow step(s) from %s to %s for timeout %s"
.formatted(failedStepIds.size(), WorkflowStatus.TIMED_OUT, WorkflowStatus.FAILED, timeoutDuration));

final int numCancelled = Arrays.stream(dao.cancelAllChildrenByParentStepIdAnyOf(failedStepIds)).sum();
if (numCancelled > 0) {
LOGGER.warn("Transitioned %d workflow step(s) to %s because their parent steps transitioned to %s"
.formatted(numCancelled, WorkflowStatus.CANCELLED, WorkflowStatus.FAILED));
}
});

final int numDeleted = dao.deleteAllForRetention(retentionDuration);
if (numDeleted > 0) {
LOGGER.info("Deleted %s workflow(s) that have not been updated within %s"
.formatted(numDeleted, retentionDuration));
}
});
}

}
1 change: 0 additions & 1 deletion src/main/java/org/dependencytrack/tasks/LockName.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public enum LockName {
EPSS_MIRROR_TASK_LOCK,
VULNERABILITY_METRICS_TASK_LOCK,
INTERNAL_COMPONENT_IDENTIFICATION_TASK_LOCK,
WORKFLOW_STEP_CLEANUP_TASK_LOCK,
PORTFOLIO_REPO_META_ANALYSIS_TASK_LOCK,
PORTFOLIO_VULN_ANALYSIS_TASK_LOCK,
INTEGRITY_META_INITIALIZER_LOCK,
Expand Down
Loading

0 comments on commit b15bd91

Please sign in to comment.