Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for querying externalExecutionId #900

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,15 @@ public interface TaskExplorer {
*/
Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);

/**
* Retrieve a collection of taskExecutions that contain the provided external
* execution id.
* @param externalExecutionId the external execution id of the tasks
* @param pageable the constraints for the search
* @return the set of task executions for tasks with the external execution id
*/
Page<TaskExecution> findTaskExecutionsByExecutionId(String externalExecutionId, Pageable pageable);

/**
* Retrieve a list of available task names.
* @return the set of task names that have been executed
Expand All @@ -71,6 +80,13 @@ public interface TaskExplorer {
*/
long getRunningTaskExecutionCount();

/**
* Retrieves current number of task executions by external executionId.
* @param externalExecutionId The externalExecutionId to be searched.
* @return current number of task executions for a specific externalExecutionId.
*/
long getTaskExecutionCountByExternalExecutionId(String externalExecutionId);

/**
* Get a collection/page of executions.
* @param taskName the name of the task to be searched
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,6 +85,11 @@ public class JdbcTaskExecutionDao implements TaskExecutionDao {
*/
public static final String TASK_NAME_WHERE_CLAUSE = "where TASK_NAME = :taskName ";

/**
* WHERE clause for external execution id.
*/
public static final String EXTERNAL_EXECUTION_ID_WHERE_CLAUSE = "where EXTERNAL_EXECUTION_ID = :externalExecutionId ";

private static final String SAVE_TASK_EXECUTION = "INSERT into %PREFIX%EXECUTION"
+ "(TASK_EXECUTION_ID, EXIT_CODE, START_TIME, TASK_NAME, LAST_UPDATED, EXTERNAL_EXECUTION_ID, PARENT_EXECUTION_ID)"
+ "values (:taskExecutionId, :exitCode, :startTime, "
Expand Down Expand Up @@ -126,6 +131,9 @@ public class JdbcTaskExecutionDao implements TaskExecutionDao {
private static final String TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "%PREFIX%EXECUTION where TASK_NAME = :taskName";

private static final String TASK_EXECUTION_COUNT_BY_EXTERNAL_EXECUTION_ID = "SELECT COUNT(*) FROM "
+ "%PREFIX%EXECUTION where EXTERNAL_EXECUTION_ID = :externalExecutionId";

private static final String RUNNING_TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "%PREFIX%EXECUTION where TASK_NAME = :taskName AND END_TIME IS NULL ";

Expand Down Expand Up @@ -407,6 +415,27 @@ public Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable p
new MapSqlParameterSource("taskName", taskName), getRunningTaskExecutionCountByTaskName(taskName));
}

@Override
public Page<TaskExecution> findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable) {
return queryForPageableResults(pageable, SELECT_CLAUSE, FROM_CLAUSE, EXTERNAL_EXECUTION_ID_WHERE_CLAUSE,
new MapSqlParameterSource("externalExecutionId", externalExecutionId),
getTaskExecutionCountByExternalExecutionId(externalExecutionId));
}

@Override
public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) {
final MapSqlParameterSource queryParameters = new MapSqlParameterSource().addValue("externalExecutionId",
externalExecutionId, Types.VARCHAR);

try {
return this.jdbcTemplate.queryForObject(getQuery(TASK_EXECUTION_COUNT_BY_EXTERNAL_EXECUTION_ID),
queryParameters, Long.class);
}
catch (EmptyResultDataAccessException e) {
return 0;
}
}

@Override
public Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable) {
return queryForPageableResults(pageable, SELECT_CLAUSE, FROM_CLAUSE, TASK_NAME_WHERE_CLAUSE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,6 +128,17 @@ public long getTaskExecutionCountByTaskName(String taskName) {
return count;
}

@Override
public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) {
int count = 0;
for (Map.Entry<Long, TaskExecution> entry : this.taskExecutions.entrySet()) {
if (entry.getValue().getExternalExecutionId().equals(externalExecutionId)) {
count++;
}
}
return count;
}

@Override
public long getRunningTaskExecutionCountByTaskName(String taskName) {
int count = 0;
Expand Down Expand Up @@ -166,6 +177,18 @@ public Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable p
return getPageFromList(new ArrayList<>(result), pageable, getRunningTaskExecutionCountByTaskName(taskName));
}

@Override
public Page<TaskExecution> findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable) {
Set<TaskExecution> result = getTaskExecutionTreeSet();
for (Map.Entry<Long, TaskExecution> entry : this.taskExecutions.entrySet()) {
if (entry.getValue().getExternalExecutionId().equals(externalExecutionId)) {
result.add(entry.getValue());
}
}
return getPageFromList(new ArrayList<>(result), pageable,
getTaskExecutionCountByExternalExecutionId(externalExecutionId));
}

@Override
public Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable) {
Set<TaskExecution> filteredSet = getTaskExecutionTreeSet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -150,6 +150,24 @@ void completeTaskExecution(long executionId, Integer exitCode, LocalDateTime end
*/
Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable pageable);

/**
* Retrieve a collection of taskExecutions that contain the provided external
* execution id.
* @param externalExecutionId the external execution id of the tasks
* @param pageable the constraints for the search
* @return the set of task executions for tasks with the externalExecutionId
*/
Page<TaskExecution> findTaskExecutionsByExternalExecutionId(String externalExecutionId, Pageable pageable);

/**
* Retrieves current number of task executions for a externalTaskExecutionId.
* @param externalExecutionId the external execution id of the task to search for in
* the repository.
* @return current number of task executions for the externalExecutionId.
*/

long getTaskExecutionCountByExternalExecutionId(String externalExecutionId);

/**
* Retrieves a subset of task executions by task name, start location and size.
* @param taskName the name of the task to search for in the repository.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,11 @@ public Page<TaskExecution> findRunningTaskExecutions(String taskName, Pageable p
return this.taskExecutionDao.findRunningTaskExecutions(taskName, pageable);
}

@Override
public Page<TaskExecution> findTaskExecutionsByExecutionId(String externalExecutionId, Pageable pageable) {
return this.taskExecutionDao.findTaskExecutionsByExternalExecutionId(externalExecutionId, pageable);
}

@Override
public List<String> getTaskNames() {
return this.taskExecutionDao.getTaskNames();
Expand All @@ -79,6 +84,11 @@ public long getRunningTaskExecutionCount() {
return this.taskExecutionDao.getRunningTaskExecutionCount();
}

@Override
public long getTaskExecutionCountByExternalExecutionId(String externalExecutionId) {
return this.taskExecutionDao.getTaskExecutionCountByExternalExecutionId(externalExecutionId);
}

@Override
public Page<TaskExecution> findTaskExecutionsByName(String taskName, Pageable pageable) {
return this.taskExecutionDao.findTaskExecutionsByName(taskName, pageable);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -151,7 +151,7 @@ public void findRunningTasks(DaoType testType) {
final int COMPLETE_COUNT = 5;

Map<Long, TaskExecution> expectedResults = new HashMap<>();
// Store completed jobs
// Store completed task executions
int i = 0;
for (; i < COMPLETE_COUNT; i++) {
createAndSaveTaskExecution(i);
Expand All @@ -178,6 +178,55 @@ public void findRunningTasks(DaoType testType) {
}
}

@ParameterizedTest
@MethodSource("data")
public void findTasksByExternalExecutionId(DaoType testType) {
testDefaultContext(testType);
Map<Long, TaskExecution> sampleDataSet = createSampleDataSet(33);
sampleDataSet.values().forEach(taskExecution -> {
Page<TaskExecution> taskExecutionsByExecutionId = this.taskExplorer
.findTaskExecutionsByExecutionId(taskExecution.getExternalExecutionId(), PageRequest.of(0, 5));
assertThat(taskExecutionsByExecutionId.getTotalElements()).isEqualTo(1);
assertThat(this.taskExplorer
.getTaskExecutionCountByExternalExecutionId(taskExecution.getExternalExecutionId())).isEqualTo(1);
TaskExecution resultTaskExecution = taskExecutionsByExecutionId.getContent().get(0);
assertThat(resultTaskExecution.getExecutionId()).isEqualTo(taskExecution.getExecutionId());
});
}

@ParameterizedTest
@MethodSource("data")
public void findTasksByExternalExecutionIdMultipleEntry(DaoType testType) {
testDefaultContext(testType);

testDefaultContext(testType);
final int SAME_EXTERNAL_ID_COUNT = 2;
final int UNIQUE_COUNT = 3;

Map<Long, TaskExecution> expectedResults = new HashMap<>();
// Store task executions each with a unique external execution id
int i = 0;
for (; i < UNIQUE_COUNT; i++) {
createAndSaveTaskExecution(i);
}
// Create task execution with same external execution id
for (; i < (UNIQUE_COUNT + SAME_EXTERNAL_ID_COUNT); i++) {
TaskExecution expectedTaskExecution = this.taskRepository.createTaskExecution(getSimpleTaskExecution());
expectedResults.put(expectedTaskExecution.getExecutionId(), expectedTaskExecution);
}
Pageable pageable = PageRequest.of(0, 10);
Page<TaskExecution> resultSet = this.taskExplorer.findTaskExecutionsByExecutionId(EXTERNAL_EXECUTION_ID,
pageable);
assertThat(resultSet.getTotalElements()).isEqualTo(SAME_EXTERNAL_ID_COUNT);
List<TaskExecution> taskExecutions = resultSet.getContent();
taskExecutions.forEach(taskExecution -> {
assertThat(expectedResults.keySet()).contains(taskExecution.getExecutionId());
});
assertThat(this.taskExplorer.getTaskExecutionCountByExternalExecutionId(EXTERNAL_EXECUTION_ID))
.isEqualTo(SAME_EXTERNAL_ID_COUNT);

}

@ParameterizedTest
@MethodSource("data")
public void findTasksByName(DaoType testType) {
Expand All @@ -186,7 +235,7 @@ public void findTasksByName(DaoType testType) {
final int COMPLETE_COUNT = 7;

Map<Long, TaskExecution> expectedResults = new HashMap<>();
// Store completed jobs
// Store completed task executions
for (int i = 0; i < COMPLETE_COUNT; i++) {
createAndSaveTaskExecution(i);
}
Expand Down