-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of github.com:scalableminds/webknossos into doc…
…s_lili * 'master' of github.com:scalableminds/webknossos: Add loaded meshes to sharing link (#5993)
- Loading branch information
Showing
50 changed files
with
1,048 additions
and
603 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,51 @@ | ||
// @flow | ||
import { type Saga, type Task, join, call, fork } from "oxalis/model/sagas/effect-generators"; | ||
|
||
export default function processTaskWithPool<T>( | ||
tasks: Array<() => Promise<T>>, | ||
export default function* processTaskWithPool( | ||
tasks: Array<() => Saga<void>>, | ||
poolSize: number, | ||
): Promise<Array<T>> { | ||
return new Promise((resolve, reject) => { | ||
const promises = []; | ||
let isFinalResolveScheduled = false; | ||
): Saga<void> { | ||
const startedTasks: Array<Task<void>> = []; | ||
let isFinalResolveScheduled = false; | ||
let error = null; | ||
|
||
const startNextTask = () => { | ||
if (tasks.length === 0) { | ||
if (!isFinalResolveScheduled) { | ||
isFinalResolveScheduled = true; | ||
function* forkSafely(fn): Saga<void> { | ||
// Errors from forked tasks cannot be caught, see https://redux-saga.js.org/docs/advanced/ForkModel/#error-propagation | ||
// However, the task pool should not abort if a single task fails. | ||
// Therefore, use this wrapper to safely execute all tasks and possibly rethrow the last error in the end. | ||
try { | ||
yield* call(fn); | ||
} catch (e) { | ||
error = e; | ||
} | ||
} | ||
|
||
function* startNextTask(): Saga<void> { | ||
if (tasks.length === 0) { | ||
if (!isFinalResolveScheduled) { | ||
isFinalResolveScheduled = true; | ||
|
||
// All tasks were kicked off, which is why all promises can be | ||
// awaited now together. | ||
Promise.all(promises).then(resolve, reject); | ||
} | ||
return; | ||
// All tasks were kicked off, which is why all tasks can be | ||
// awaited now together. | ||
yield* join(startedTasks); | ||
if (error != null) throw error; | ||
} | ||
return; | ||
} | ||
|
||
const task = tasks.shift(); | ||
const newPromise = task(); | ||
promises.push(newPromise); | ||
const task = tasks.shift(); | ||
const newTask = yield* fork(forkSafely, task); | ||
startedTasks.push(newTask); | ||
|
||
// If that promise is done, process a new one (that way, | ||
// the pool size stays constant until the queue is almost empty.) | ||
newPromise.then(startNextTask, startNextTask); | ||
}; | ||
// If that task is done, process a new one (that way, | ||
// the pool size stays constant until the queue is almost empty.) | ||
yield* join(newTask); | ||
yield* call(startNextTask); | ||
} | ||
|
||
for (let i = 0; i < poolSize; i++) { | ||
startNextTask(); | ||
} | ||
}); | ||
for (let i = 0; i < poolSize; i++) { | ||
yield* fork(startNextTask); | ||
} | ||
// The saga will wait for all forked tasks to terminate before returning, because | ||
// fork() creates attached forks (in contrast to spawn()). | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.