Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Multithreaded Support + Refactor #361

Merged
merged 2 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class PriceInfoTooltip {
private static JsonObject lowestPricesJson;
private static JsonObject isMuseumJson;
private static JsonObject motesPricesJson;
private static boolean nullMsgSend = false;
private static volatile boolean nullMsgSend = false;
private final static Gson gson = new Gson();
private static final Map<String, String> apiAddresses;
private static long npcHash = 0;
Expand Down Expand Up @@ -395,7 +395,7 @@ public static void init() {
minute++;
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.whenComplete((unused, throwable) -> nullMsgSend = false);
}, 1200);
}, 1200, true);
}

private static JsonObject downloadPrices(String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void queueMessage(String message, int delay) {
}

@Override
protected boolean runTask(Runnable task) {
protected boolean runTask(Runnable task, boolean multithreaded) {
if (lastMessage + MIN_DELAY < System.currentTimeMillis()) {
task.run();
lastMessage = System.currentTimeMillis();
Expand Down
69 changes: 38 additions & 31 deletions src/main/java/de/hysky/skyblocker/utils/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.hysky.skyblocker.utils.scheduler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mojang.brigadier.Command;
import it.unimi.dsi.fastutil.ints.AbstractInt2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
Expand All @@ -11,6 +12,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

/**
Expand All @@ -21,19 +24,35 @@ public class Scheduler {
public static final Scheduler INSTANCE = new Scheduler();
private int currentTick = 0;
private final AbstractInt2ObjectMap<List<ScheduledTask>> tasks = new Int2ObjectOpenHashMap<>();
private final ExecutorService executors = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Skyblocker-Scheduler-%d").build());

protected Scheduler() {
}

/**
* @see #schedule(Runnable, int, boolean)
*/
public void schedule(Runnable task, int delay) {
schedule(task, delay, false);
}

/**
* @see #scheduleCyclic(Runnable, int, boolean)
*/
public void scheduleCyclic(Runnable task, int period) {
scheduleCyclic(task, period, false);
}

/**
* Schedules a task to run after a delay.
*
* @param task the task to run
* @param delay the delay in ticks
* @param multithreaded whether to run the task on the schedulers dedicated thread pool
*/
public void schedule(Runnable task, int delay) {
public void schedule(Runnable task, int delay, boolean multithreaded) {
if (delay >= 0) {
addTask(new ScheduledTask(task), currentTick + delay);
addTask(new ScheduledTask(task, multithreaded), currentTick + delay);
} else {
LOGGER.warn("Scheduled a task with negative delay");
}
Expand All @@ -44,10 +63,11 @@ public void schedule(Runnable task, int delay) {
*
* @param task the task to run
* @param period the period in ticks
* @param multithreaded whether to run the task on the schedulers dedicated thread pool
*/
public void scheduleCyclic(Runnable task, int period) {
public void scheduleCyclic(Runnable task, int period, boolean multithreaded) {
if (period > 0) {
addTask(new CyclicTask(task, period), currentTick);
addTask(new ScheduledTask(task, period, true, multithreaded), currentTick);
} else {
LOGGER.error("Attempted to schedule a cyclic task with period lower than 1");
}
Expand All @@ -74,7 +94,7 @@ public void tick() {
//noinspection ForLoopReplaceableByForEach (or else we get a ConcurrentModificationException)
for (int i = 0; i < currentTickTasks.size(); i++) {
ScheduledTask task = currentTickTasks.get(i);
if (!runTask(task)) {
if (!runTask(task, task.multithreaded)) {
tasks.computeIfAbsent(currentTick + 1, key -> new ArrayList<>()).add(task);
}
}
Expand All @@ -89,8 +109,13 @@ public void tick() {
* @param task the task to run
* @return {@code true} if the task is run, and {@link false} if task is not run.
*/
protected boolean runTask(Runnable task) {
task.run();
protected boolean runTask(Runnable task, boolean multithreaded) {
if (multithreaded) {
executors.execute(task);
} else {
task.run();
}

return true;
}

Expand All @@ -105,36 +130,18 @@ private void addTask(ScheduledTask scheduledTask, int schedule) {
}

/**
* A task that runs every period ticks. More specifically, this task reschedules itself to run again after period ticks every time it runs.
* A task that that is scheduled to execute once after the {@code interval}, or that is run every {@code interval} ticks.
*/
protected class CyclicTask extends ScheduledTask {
private final int period;

CyclicTask(Runnable inner, int period) {
super(inner);
this.period = period;
protected record ScheduledTask(Runnable task, int interval, boolean cyclic, boolean multithreaded) implements Runnable {
private ScheduledTask(Runnable task, boolean multithreaded) {
this(task, -1, false, multithreaded);
}

@Override
public void run() {
super.run();
addTask(this, currentTick + period);
}
}
task.run();

/**
* A task that runs at a specific tick, relative to {@link #currentTick}.
*/
protected static class ScheduledTask implements Runnable {
private final Runnable inner;

public ScheduledTask(Runnable inner) {
this.inner = inner;
}

@Override
public void run() {
inner.run();
if (cyclic) INSTANCE.addTask(this, INSTANCE.currentTick + interval);
}
}
}