Skip to content

Commit

Permalink
Allow Task-types to specify a default priority (typically HIGH for re…
Browse files Browse the repository at this point in the history
…curring, and MEDIUM for the rest)
  • Loading branch information
kagkarlsson committed Sep 27, 2024
1 parent 8aed4fe commit fce9c49
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public Scheduler build() {
taskResolver,
schedulerName,
serializer,
enablePriority,
enablePriority,
clock);
final JdbcTaskRepository clientTaskRepository =
new JdbcTaskRepository(
Expand All @@ -266,7 +266,7 @@ public Scheduler build() {
taskResolver,
schedulerName,
serializer,
enablePriority,
enablePriority,
clock);

ExecutorService candidateExecutorService = executorService;
Expand Down Expand Up @@ -300,7 +300,7 @@ public Scheduler build() {
waiter.getWaitDuration().getSeconds(),
heartbeatInterval.getSeconds(),
enableImmediateExecution,
enablePriority,
enablePriority,
tableName,
schedulerName.getName());

Expand All @@ -326,7 +326,7 @@ public Scheduler build() {
startTasks,
candidateDueExecutor,
candidateHousekeeperExecutor,
enablePriority);
enablePriority);

if (enableImmediateExecution) {
scheduler.registerSchedulerListener(new ImmediateCheckForDueExecutions(scheduler, clock));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.github.kagkarlsson.scheduler.task;

import com.github.kagkarlsson.scheduler.task.TaskInstance.Builder;

public abstract class AbstractTask<T> implements Task<T> {

protected final String name;
Expand Down Expand Up @@ -48,12 +50,12 @@ public TaskInstance<T> instance(String id) {

@Override
public TaskInstance<T> instance(String id, T data) {
return instanceBuilder(id).data(data).build();
return instanceBuilder(id).priority(getDefaultPriority()).data(data).build();
}

@Override
public TaskInstance.Builder<T> instanceBuilder(String id) {
return new TaskInstance.Builder<>(this.name, id);
return new Builder<>(this.name, id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.task;

public class Priority {

public static final int HIGH = 90;
public static final int MEDIUM = 50;
public static final int LOW = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public interface Task<T> extends ExecutionHandler<T>, HasTaskName {
default TaskInstanceId instanceId(String id) {
return TaskInstanceId.of(getName(), id);
}
;

SchedulableInstance<T> schedulableInstance(String id);

Expand All @@ -42,4 +41,8 @@ default TaskInstanceId instanceId(String id) {
default String getTaskName() {
return getName();
}

default int getDefaultPriority() {
return Priority.MEDIUM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

public final class TaskInstance<T> implements TaskInstanceId {

private static final int DEFAULT_PRIORITY = 0;

private final String taskName;
private final String id;
private final Supplier<T> dataSupplier;
Expand All @@ -30,7 +28,7 @@ public TaskInstance(String taskName, String id) {
}

public TaskInstance(String taskName, String id, T data) {
this(taskName, id, () -> data, DEFAULT_PRIORITY);
this(taskName, id, () -> data, Priority.MEDIUM);
}

public TaskInstance(String taskName, String id, Supplier<T> dataSupplier, int priority) {
Expand Down Expand Up @@ -86,7 +84,7 @@ public static class Builder<T> {
private final String taskName;
private final String id;
private Supplier<T> dataSupplier = () -> (T) null;
private int priority = DEFAULT_PRIORITY;
private int priority = Priority.MEDIUM;

public Builder(String taskName, String id) {
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import java.time.Instant;
import java.util.function.Function;

public abstract class CustomTask<T> extends AbstractTask<T> implements OnStartup {
private ScheduleOnStartup<T> scheduleOnStartup;
private final NextExecutionTime defaultExecutionTime;
private ScheduleOnStartup<T> scheduleOnStartup;

public CustomTask(
String name,
Expand All @@ -44,13 +43,13 @@ public CustomTask(

@Override
public SchedulableInstance<T> schedulableInstance(String id) {
return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), defaultExecutionTime);
return new SchedulableTaskInstance<>(instanceBuilder(id).build(), defaultExecutionTime);
}

@Override
public SchedulableInstance<T> schedulableInstance(String id, T data) {
return new SchedulableTaskInstance<>(
new TaskInstance<>(getName(), id, data), defaultExecutionTime);
instanceBuilder(id).data(data).build(), defaultExecutionTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ public OneTimeTask(

@Override
public SchedulableInstance<T> schedulableInstance(String id) {
return new SchedulableTaskInstance<>(
new TaskInstance<>(getName(), id), (currentTime) -> currentTime);
return new SchedulableTaskInstance<>(instanceBuilder(id).build(), (currentTime) -> currentTime);
}

@Override
public SchedulableInstance<T> schedulableInstance(String id, T data) {
return new SchedulableTaskInstance<>(
new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime);
instanceBuilder(id).data(data).build(), (currentTime) -> currentTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public RecurringTask(
this.scheduleOnStartup = scheduleOnStartup;
}

@Override
public int getDefaultPriority() {
return Priority.HIGH;
}

@Override
public SchedulableInstance<T> schedulableInstance(String id) {
return new SchedulableTaskInstance<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public SchedulableInstance<T> schedulableInstance(String id) {
@Override
public SchedulableInstance<T> schedulableInstance(String id, T data) {
return new SchedulableTaskInstance<>(
new TaskInstance<>(getName(), id, data), data.getSchedule()::getInitialExecutionTime);
instanceBuilder(id).data(data).build(), data.getSchedule()::getInitialExecutionTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,25 @@ public void get_due_should_be_sorted_by_priority() {
new SchedulableTaskInstance<>(
oneTimeTask.instanceBuilder("id1").priority(1).build(), now.minus(Duration.ofDays(1)));
SchedulableTaskInstance<Void> id2 =
new SchedulableTaskInstance<>(
oneTimeTask.instanceBuilder("id2").priority(10).build(), now.minus(Duration.ofDays(2)));
new SchedulableTaskInstance<>(
oneTimeTask.instanceBuilder("id2").priority(10).build(), now.minus(Duration.ofDays(2)));
SchedulableTaskInstance<Void> id3 =
new SchedulableTaskInstance<>(
oneTimeTask.instanceBuilder("id3").priority(5).build(), now.minus(Duration.ofDays(3)));
new SchedulableTaskInstance<>(
oneTimeTask.instanceBuilder("id3").priority(5).build(), now.minus(Duration.ofDays(3)));

Stream.of(id1, id2, id3).forEach(taskRepository::createIfNotExists);

List<String> orderedByPriority = taskRepository.getDue(now, POLLING_LIMIT, true).stream()
.map(Execution::getId).collect(Collectors.toList());
List<String> orderedByPriority =
taskRepository.getDue(now, POLLING_LIMIT, true).stream()
.map(Execution::getId)
.collect(Collectors.toList());
assertThat(orderedByPriority, contains("id2", "id3", "id1"));

List<String> orderedByExecutionTime = taskRepository.getDue(now, POLLING_LIMIT, false).stream()
.map(Execution::getId).collect(Collectors.toList());
List<String> orderedByExecutionTime =
taskRepository.getDue(now, POLLING_LIMIT, false).stream()
.map(Execution::getId)
.collect(Collectors.toList());
assertThat(orderedByExecutionTime, contains("id3", "id2", "id1"));

}

@Test
Expand Down

0 comments on commit fce9c49

Please sign in to comment.