From c8e56116347ce3b365d12ea317ad9113f25d1c4d Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Wed, 28 Aug 2024 17:48:41 -0400 Subject: [PATCH 1/3] Store the job's thread in `JobData`. Since we're storing more and more in `JobData`, and the background thread only needs three bits of that data, I added those to `ThreadEvent` so we don't have to pass the full object. This may improve performance in HTML5 specifically, where passing a class instance incurs an overhead. --- src/lime/system/ThreadPool.hx | 28 ++++++++++------------------ src/lime/system/WorkOutput.hx | 14 +++++++++++--- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index ec8669b295..8653f1f0a2 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -184,12 +184,7 @@ class ThreadPool extends WorkOutput private var __activeJobs:JobList; #if lime_threads - /** - The set of threads actively running a job. - **/ - private var __activeThreads:Map; - - /** + /** A list of idle threads. Not to be confused with `idleThreads`, a public variable equal to `__idleThreads.length`. **/ @@ -222,8 +217,7 @@ class ThreadPool extends WorkOutput #if lime_threads if (this.mode == MULTI_THREADED) { - __activeThreads = new Map(); - __idleThreads = []; + __idleThreads = []; } #end } @@ -249,7 +243,7 @@ class ThreadPool extends WorkOutput #if lime_threads if (mode == MULTI_THREADED) { - var thread:Thread = __activeThreads[job.id]; + var thread:Thread = job.thread; if (idleThreads < minThreads) { thread.sendMessage({event: CANCEL}); @@ -306,11 +300,10 @@ class ThreadPool extends WorkOutput public function cancelJob(jobID:Int):Bool { #if lime_threads - var thread:Thread = __activeThreads[jobID]; + var thread:Thread = __activeJobs.get(jobID).thread; if (thread != null) { thread.sendMessage({event: CANCEL}); - __activeThreads.remove(jobID); __idleThreads.push(thread); } #end @@ -410,7 +403,7 @@ class ThreadPool extends WorkOutput return; } - if (event.event != WORK || event.job == null) + if (event.event != WORK || event.doWork == null || event.id == null) { // Go idle. event = null; @@ -418,7 +411,7 @@ class ThreadPool extends WorkOutput } // Get to work. - output.activeJob = event.job; + output.activeJob = new JobData(event.doWork, event.state, event.id); var interruption:Dynamic = null; try @@ -426,7 +419,7 @@ class ThreadPool extends WorkOutput while (!output.__jobComplete.value && (interruption = Thread.readMessage(false)) == null) { output.workIterations.value++; - event.job.doWork.dispatch(event.job.state, output); + event.doWork.dispatch(event.state, output); } } catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end) @@ -495,8 +488,8 @@ class ThreadPool extends WorkOutput #end var thread:Thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); - __activeThreads[job.id] = thread; - thread.sendMessage({event: WORK, job: job}); + job.thread = thread; + thread.sendMessage({event: WORK, jobID: job.id, doWork: job.doWork, state: job.state}); } #end } @@ -582,8 +575,7 @@ class ThreadPool extends WorkOutput #if lime_threads if (mode == MULTI_THREADED) { - var thread:Thread = __activeThreads[activeJob.id]; - __activeThreads.remove(activeJob.id); + var thread:Thread = activeJob.thread; if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) { diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index c2673703bd..be88303c3f 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -331,10 +331,15 @@ class JobData @:allow(lime.system.WorkOutput) private var startTime:Float = 0; + #if lime_threads @:allow(lime.system.WorkOutput) - private inline function new(doWork:WorkFunctionWorkOutput->Void>, state:State) + private var thread:Thread; + #end + + @:allow(lime.system.WorkOutput) + private inline function new(doWork:WorkFunctionWorkOutput->Void>, state:State, ?id:Int) { - id = nextID++; + this.id = id != null ? id : nextID++; this.doWork = doWork; this.state = state; } @@ -358,8 +363,11 @@ typedef ThreadEvent = { var event:ThreadEventType; @:optional var message:Dynamic; - @:optional var job:JobData; @:optional var jobID:Int; + + // Only for "WORK" events + @:optional var doWork:WorkFunctionWorkOutput->Void>; + @:optional var state:State; } class JSAsync From 08591487513ae9518599b04a30c0715efdd3722f Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sun, 1 Sep 2024 18:14:55 -0400 Subject: [PATCH 2/3] Allow a single `ThreadPool` to run jobs in both modes. --- src/lime/system/ThreadPool.hx | 361 ++++++++++++++++------------------ src/lime/system/WorkOutput.hx | 24 +-- 2 files changed, 175 insertions(+), 210 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 8653f1f0a2..b707b0b2fa 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -89,9 +89,9 @@ class ThreadPool extends WorkOutput /** __Access this only from the main thread.__ - The sum of all active single-threaded pools' `workPriority` values. + The sum of `workPriority` values from all pools with an ongoing + single-threaded job. **/ - @:allow(lime.system.JobList) private static var __totalWorkPriority:Float = 0; /** @@ -181,17 +181,19 @@ class ThreadPool extends WorkOutput private var __doWork:WorkFunctionWorkOutput->Void>; - private var __activeJobs:JobList; - #if lime_threads - /** + /** A list of idle threads. Not to be confused with `idleThreads`, a public variable equal to `__idleThreads.length`. **/ - private var __idleThreads:Array; + private var __idleThreads:Array = []; + + private var __multiThreadedJobs:JobArray = []; + private var __multiThreadedQueue:JobArray = []; #end - private var __jobQueue:JobList = new JobList(); + private var __singleThreadedJob(default, set):JobData; + private var __singleThreadedQueue:JobArray = []; /** __Call this only from the main thread.__ @@ -201,25 +203,20 @@ class ThreadPool extends WorkOutput immediately; only after enough calls to `run()`. Only applies in multi-threaded mode. @param maxThreads The maximum number of threads that will run at once. - @param mode Defaults to `MULTI_THREADED` on most targets, but - `SINGLE_THREADED` in HTML5. In HTML5, `MULTI_THREADED` mode uses web - workers, which impose additional restrictions. + @param mode The mode jobs will run in by default. Defaults to + `SINGLE_THREADED` in HTML5 for backwards compatibility. **/ public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null) { - super(mode); + if (!isMainThread()) + { + throw "Call new ThreadPool() only from the main thread."; + } - __activeJobs = new JobList(this); + super(mode); this.minThreads = minThreads; this.maxThreads = maxThreads; - - #if lime_threads - if (this.mode == MULTI_THREADED) - { - __idleThreads = []; - } - #end } /** @@ -237,10 +234,10 @@ class ThreadPool extends WorkOutput Application.current.onUpdate.remove(__update); + #if lime_threads // Cancel active jobs, leaving `minThreads` idle threads. - for (job in __activeJobs) + for (job in __multiThreadedJobs) { - #if lime_threads if (mode == MULTI_THREADED) { var thread:Thread = job.thread; @@ -254,7 +251,6 @@ class ThreadPool extends WorkOutput thread.sendMessage({event: EXIT}); } } - #end if (error != null) { @@ -268,9 +264,8 @@ class ThreadPool extends WorkOutput activeJob = null; } } - __activeJobs.clear(); + __multiThreadedJobs.clear(); - #if lime_threads // Exit idle threads if there are more than the minimum. while (idleThreads > minThreads) { @@ -278,16 +273,34 @@ class ThreadPool extends WorkOutput } #end - // Clear the job queue. + if (__singleThreadedJob != null && error != null) + { + activeJob = __singleThreadedJob; + onError.dispatch(error); + activeJob = null; + } + __singleThreadedJob = null; + + // Clear the job queues. if (error != null) { - for (job in __jobQueue) + for (job in __singleThreadedQueue) + { + activeJob = job; + onError.dispatch(error); + } + #if lime_threads + for (job in __multiThreadedQueue) { activeJob = job; onError.dispatch(error); } + #end } - __jobQueue.clear(); + __singleThreadedQueue.clear(); + #if lime_threads + __multiThreadedQueue.clear(); + #end __jobComplete.value = false; activeJob = null; @@ -299,16 +312,32 @@ class ThreadPool extends WorkOutput **/ public function cancelJob(jobID:Int):Bool { + if (__singleThreadedJob != null && __singleThreadedJob.id == jobID) + { + __singleThreadedJob = null; + return true; + } + else if (__singleThreadedQueue.removeJob(jobID) != null) + { + return true; + } + #if lime_threads - var thread:Thread = __activeJobs.get(jobID).thread; - if (thread != null) + var job:JobData = __multiThreadedJobs.removeJob(jobID); + if (job != null) { - thread.sendMessage({event: CANCEL}); - __idleThreads.push(thread); + if (job.thread != null) + { + job.thread.sendMessage({event: CANCEL}); + __idleThreads.push(job.thread); + } + return true; } - #end - return __activeJobs.remove(__activeJobs.get(jobID)) || __jobQueue.remove(__jobQueue.get(jobID)); + return __multiThreadedQueue.removeJob(jobID) != null; + #else + return false; + #end } /** @@ -327,9 +356,11 @@ class ThreadPool extends WorkOutput only access its arguments, and return often. @param state An object to pass to `doWork`, ideally a mutable object so that `doWork` can save its progress. + @param mode Which mode to run the job in. If omitted, the pool's default + mode will be used. @return The job's unique ID. **/ - public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null):Int + public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null, ?mode:ThreadMode = null):Int { if (!isMainThread()) { @@ -354,13 +385,24 @@ class ThreadPool extends WorkOutput } var job:JobData = new JobData(doWork, state); - __jobQueue.push(job); + #if lime_threads + if (mode == MULTI_THREADED || mode == null && this.mode == MULTI_THREADED) + { + __multiThreadedQueue.push(job); + } + else + #end + { + __singleThreadedQueue.push(job); + } if (!Application.current.onUpdate.has(__update)) { Application.current.onUpdate.add(__update); } + __startJobs(); + return job.id; } @@ -403,7 +445,7 @@ class ThreadPool extends WorkOutput return; } - if (event.event != WORK || event.doWork == null || event.id == null) + if (event.event != WORK || event.doWork == null || event.jobID == null) { // Go idle. event = null; @@ -411,7 +453,7 @@ class ThreadPool extends WorkOutput } // Get to work. - output.activeJob = new JobData(event.doWork, event.state, event.id); + output.activeJob = new JobData(event.doWork, event.state, event.jobID); var interruption:Dynamic = null; try @@ -462,50 +504,66 @@ class ThreadPool extends WorkOutput } /** - Schedules (in multi-threaded mode) or runs (in single-threaded mode) the - job queue, then processes incoming events. + Processes the job queues, starting any jobs that can be started. **/ - private function __update(deltaTime:Int):Void + private function __startJobs():Void { if (!isMainThread()) { return; } - // Process the queue. - while (__jobQueue.length > 0 && activeJobs < maxThreads) + if (__singleThreadedJob == null && __singleThreadedQueue.length > 0) { - var job:JobData = __jobQueue.pop(); - - job.startTime = timestamp(); - __activeJobs.push(job); + __singleThreadedJob = __singleThreadedQueue.shift(); + __singleThreadedJob.startTime = timestamp(); + } - #if lime_threads - if (mode == MULTI_THREADED) + #if lime_threads + for (job in __multiThreadedQueue) + { + if (__multiThreadedJobs.length >= maxThreads) { - #if html5 - job.doWork.makePortable(); - #end - - var thread:Thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); - job.thread = thread; - thread.sendMessage({event: WORK, jobID: job.id, doWork: job.doWork, state: job.state}); + break; } + + #if html5 + job.doWork.makePortable(); #end + + job.thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); + job.thread.sendMessage({event: WORK, jobID: job.id, doWork: job.doWork, state: job.state}); + job.startTime = timestamp(); + + __multiThreadedJobs.push(job); + __multiThreadedQueue.remove(job); } + #end + } - // Run the next single-threaded job, if any. - if (mode == SINGLE_THREADED && __activeJobs.hasNext()) + /** + Processes the job queues, then processes incoming events. + **/ + private function __update(deltaTime:Int):Void + { + if (!isMainThread()) { - activeJob = __activeJobs.next(); + return; + } + + __startJobs(); + + // Run the single-threaded job. + if (__singleThreadedJob != null) + { + activeJob = __singleThreadedJob; var state:State = activeJob.state; __jobComplete.value = false; workIterations.value = 0; - // `workLoad / frameRate` is the total time that pools may use per - // frame. `workPriority / __totalWorkPriority` is this pool's - // fraction of that total. + // `workLoad / frameRate` is the total time that pools may use per frame. + // `workPriority / __totalWorkPriority` is this pool's fraction of that total. var maxTimeElapsed:Float = workPriority * workLoad / (__totalWorkPriority * Application.current.window.frameRate); var startTime:Float = timestamp(); @@ -533,24 +591,32 @@ class ThreadPool extends WorkOutput var threadEvent:ThreadEvent; while ((threadEvent = __jobOutput.pop(false)) != null) { - if (threadEvent.jobID != null) + var activeJobMode:ThreadMode = SINGLE_THREADED; + if (__singleThreadedJob != null && threadEvent.jobID == __singleThreadedJob.id) { - activeJob = __activeJobs.get(threadEvent.jobID); + activeJob = __singleThreadedJob; } else { - activeJob = threadEvent.job; + #if lime_threads + activeJob = __multiThreadedJobs.getJob(threadEvent.jobID); + activeJobMode = MULTI_THREADED; + #else + continue; + #end } - if (activeJob == null || !__activeJobs.exists(activeJob)) + if (activeJob == null) { continue; } - if (mode == MULTI_THREADED) + #if lime_threads + if (activeJobMode == MULTI_THREADED) { activeJob.duration = timestamp() - activeJob.startTime; } + #end switch (threadEvent.event) { @@ -570,23 +636,25 @@ class ThreadPool extends WorkOutput onError.dispatch(threadEvent.message); } - __activeJobs.remove(activeJob); - #if lime_threads - if (mode == MULTI_THREADED) + if (activeJobMode == MULTI_THREADED) { - var thread:Thread = activeJob.thread; + __multiThreadedJobs.remove(activeJob); - if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) + if (currentThreads > maxThreads || currentThreads - __multiThreadedQueue.length > minThreads) { - thread.sendMessage({event: EXIT}); + activeJob.thread.sendMessage({event: EXIT}); } else { - __idleThreads.push(thread); + __idleThreads.push(activeJob.thread); } } + else #end + { + __singleThreadedJob = null; + } default: } @@ -594,7 +662,7 @@ class ThreadPool extends WorkOutput activeJob = null; } - if (activeJobs == 0 && __jobQueue.length == 0) + if (0 == activeJobs + __singleThreadedQueue.length #if lime_threads + __multiThreadedQueue.length #end) { Application.current.onUpdate.remove(__update); } @@ -616,7 +684,8 @@ class ThreadPool extends WorkOutput private inline function get_activeJobs():Int { - return __activeJobs.length; + return #if lime_threads __multiThreadedJobs.length + #end + (__singleThreadedJob != null ? 1 : 0); } private inline function get_idleThreads():Int @@ -634,9 +703,22 @@ class ThreadPool extends WorkOutput return this; } + private inline function set___singleThreadedJob(value:JobData):JobData + { + if (value != null && __singleThreadedJob == null) + { + __totalWorkPriority += workPriority; + } + else if (value == null && __singleThreadedJob != null) + { + __totalWorkPriority -= workPriority; + } + return __singleThreadedJob = value; + } + private function set_workPriority(value:Float):Float { - if (mode == SINGLE_THREADED && activeJobs > 0) + if (__singleThreadedJob != null) { __totalWorkPriority += value - workPriority; } @@ -697,104 +779,21 @@ private abstract PseudoEvent(ThreadPool) from ThreadPool } } -class JobList +@:forward.new @:forward +private abstract JobArray(Array) from Array { - /** - * Whether `pool.workPriority` is being added to - * `ThreadPool.__totalWorkPriority`. Set this to true when `length > 0` and - * false when `length == 0`. The setter will ensure it is only added once. - */ - @:allow(lime.system.ThreadPool) - private var __addingWorkPriority(default, set):Bool; - - private var __index:Int = 0; - - private var __jobs:Array = []; - - public var length(get, never):Int; - - public var pool(default, null):ThreadPool; - - public inline function new(?pool:ThreadPool) - { - this.pool = pool; - @:bypassAccessor __addingWorkPriority = false; - } - public inline function clear():Void { #if haxe4 - __jobs.resize(0); + this.resize(0); #else - __jobs = []; + this.splice(0, this.length); #end - __addingWorkPriority = false; - } - - public inline function exists(job:JobData):Bool - { - return get(job.id) != null; } - public inline function hasNext():Bool + public function getJob(id:Int):JobData { - return __jobs.length > 0; - } - - /** - Iterates in an endless loop, starting over upon reaching the end. - **/ - public inline function next():JobData - { - __index++; - if (__index >= length) - { - __index = 0; - } - - return __jobs[__index]; - } - - public inline function pop():JobData - { - var job:JobData = __jobs.pop(); - __addingWorkPriority = length > 0; - return job; - } - - public function remove(job:JobData):Bool - { - if (__jobs.remove(job)) - { - __addingWorkPriority = length > 0; - return true; - } - else if (removeByID(job.id)) - { - return true; - } - else - { - return false; - } - } - - public inline function removeByID(id:Int):Bool - { - if (__jobs.remove(get(id))) - { - __addingWorkPriority = length > 0; - return true; - } - else - { - return false; - } - } - - public function get(id:Int):JobData - { - for (job in __jobs) + for (job in this) { if (job.id == id) { @@ -803,36 +802,18 @@ class JobList } return null; } - public inline function push(job:JobData):Void - { - __jobs.push(job); - __addingWorkPriority = true; - } - // Getters & Setters - - private inline function set___addingWorkPriority(value:Bool):Bool + public function removeJob(id:Int):JobData { - if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread()) + for (i in 0...this.length) { - if (value) - { - ThreadPool.__totalWorkPriority += pool.workPriority; - } - else + var job:JobData = this[i]; + if (job.id == id) { - ThreadPool.__totalWorkPriority -= pool.workPriority; + this.splice(i, 1); + return job; } - return __addingWorkPriority = value; - } - else - { - return __addingWorkPriority; } - } - - private inline function get_length():Int - { - return __jobs.length; + return null; } } diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index be88303c3f..49b7c8be1a 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -49,17 +49,10 @@ class WorkOutput public var workIterations(default, null):Tls = new Tls(); /** - Whether background threads are being/will be used. If threads aren't - available on this target, `mode` will always be `SINGLE_THREADED`. + The mode jobs will run in by default. If threads aren't available, jobs + will always run in `SINGLE_THREADED` mode. **/ - public var mode(get, never):ThreadMode; - - #if lime_threads - /** - __Set this only via the constructor.__ - **/ - private var __mode:ThreadMode; - #end + public var mode:ThreadMode; /** Messages sent by active jobs, received by the main thread. @@ -87,7 +80,7 @@ class WorkOutput __jobComplete.value = false; #if lime_threads - __mode = mode != null ? mode : #if html5 SINGLE_THREADED #else MULTI_THREADED #end; + this.mode = mode != null ? mode : #if html5 SINGLE_THREADED #else MULTI_THREADED #end; #end } @@ -185,15 +178,6 @@ class WorkOutput // Getters & Setters - private inline function get_mode():ThreadMode - { - #if lime_threads - return __mode; - #else - return SINGLE_THREADED; - #end - } - private inline function get_activeJob():JobData { return __activeJob.value; From 5e8ee13d12030dd7a6169718401c1bf4b6dde6c1 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Mon, 2 Sep 2024 18:16:52 -0400 Subject: [PATCH 3/3] Remove unnecessary `@:forward.new`. --- src/lime/system/ThreadPool.hx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index b707b0b2fa..cc83ba8b5e 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -779,7 +779,7 @@ private abstract PseudoEvent(ThreadPool) from ThreadPool } } -@:forward.new @:forward +@:forward private abstract JobArray(Array) from Array { public inline function clear():Void