Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional saving of volume annotations #7264

Merged
merged 24 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f33455e
create volume transactions according to debounced pushqueue.push; inc…
philippotto Aug 14, 2023
1e135bf
Merge branch 'master' of github.com:scalableminds/webknossos into vol…
philippotto Aug 14, 2023
0c444ab
make PushQueue.push more robust by avoiding concurrent execution of i…
philippotto Aug 14, 2023
d29e86b
Revert "Revert "temporarily disable most CI checks""
philippotto Jun 27, 2023
da39e1d
don't use AsyncTaskQueue in pushqueue anymore
philippotto Aug 15, 2023
9ca1834
remove AsyncTaskQueue implementation + specs
philippotto Aug 15, 2023
a596752
implement small AsyncFifoResolver to prevent theoretical race condition
philippotto Aug 15, 2023
bd5d355
ensure that the save saga consumes N items from the save queue where …
philippotto Aug 15, 2023
b0254a4
fix tests
philippotto Aug 15, 2023
8a5d387
fix accidentally skipped tests; improve linting rule to avoid this; f…
philippotto Aug 15, 2023
a2574b8
harden error handling in PushQueue
philippotto Aug 15, 2023
e954202
move some lib modules into libs/async
philippotto Aug 15, 2023
db611cc
warn user when pushqueue is starving
philippotto Aug 15, 2023
e3228df
Apply suggestions from code review
philippotto Aug 15, 2023
31df268
clean up a bit
philippotto Aug 15, 2023
1e90b3d
tune batch count constants for volume tracings; also show downloading…
philippotto Aug 16, 2023
9111358
fix race condition in AsyncFifoResolver
philippotto Aug 16, 2023
5dceb0c
fix incorrect dtype in comment
philippotto Aug 16, 2023
e1f4a53
update changelog
philippotto Aug 16, 2023
3799b22
improve comment
philippotto Aug 16, 2023
48b19c4
rename pendingQueue to pendingBuckets
philippotto Aug 16, 2023
f4b6c89
fix incorrect method name
philippotto Aug 16, 2023
5a0e630
Merge branch 'master' of github.com:scalableminds/webknossos into vol…
philippotto Aug 17, 2023
7205d46
Merge branch 'master' of github.com:scalableminds/webknossos into vol…
philippotto Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions frontend/javascripts/libs/debounced_abortable_saga.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { call, type Saga } from "oxalis/model/sagas/effect-generators";
import { buffers, Channel, channel, runSaga } from "redux-saga";
import { delay, race, take } from "redux-saga/effects";

/*
* This function takes a saga and a debounce threshold
* and returns a function F that will trigger the given saga
* in a debounced manner.
* In contrast to a normal debouncing mechanism, the saga
* will be cancelled if F is called while the saga is running.
* Note that this means that concurrent executions of the saga
* are impossible that way (by design).
*
* Also note that the performance of this debouncing mechanism
* slower than a standard _.debounce. Also see
* debounced_abortable_saga.spec.ts for a small benchmark.
*/
export function createDebouncedAbortableCallable<T, C>(
fn: (param1: T) => Saga<void>,
debounceThreshold: number,
context: C,
) {
// The communication with the saga is done via a channel.
// That way, we can expose a normal function that
// does the triggering by filling the channel.

// Only the most recent invocation should survive.
// Therefore, create a sliding buffer with size 1.
const buffer = buffers.sliding<T>(1);
const triggerChannel: Channel<T> = channel<T>(buffer);

const _task = runSaga(
{},
debouncedAbortableSagaRunner,
debounceThreshold,
triggerChannel,
// @ts-expect-error TS thinks fn doesnt match, but it does.
fn,
context,
);

return (msg: T) => {
triggerChannel.put(msg);
};
}

export function createDebouncedAbortableParameterlessCallable<C>(
fn: () => Saga<void>,
debounceThreshold: number,
context: C,
) {
const wrappedFn = createDebouncedAbortableCallable(fn, debounceThreshold, context);
const dummyParameter = {};
return () => {
wrappedFn(dummyParameter);
};
}

function* debouncedAbortableSagaRunner<T, C>(
debounceThreshold: number,
triggerChannel: Channel<T>,
abortableFn: (param: T) => Saga<void>,
context: C,
): Saga<void> {
while (true) {
// Wait for a trigger-call by consuming
// the channel.
let msg = yield take(triggerChannel);

// Repeatedly try to execute abortableFn (each try
// might be cancelled due to new initiation-requests)
while (true) {
const { debounced, latestMessage } = yield race({
debounced: delay(debounceThreshold),
latestMessage: take(triggerChannel),
});

if (latestMessage) {
msg = latestMessage;
}

if (debounced) {
const { abortingMessage } = yield race({
finished: call([context, abortableFn], msg),
abortingMessage: take(triggerChannel),
});
if (abortingMessage) {
msg = abortingMessage;
} else {
break;
}
}
}
}
}
13 changes: 5 additions & 8 deletions frontend/javascripts/libs/task_pool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Saga, Task } from "oxalis/model/sagas/effect-generators";
import { join, call, fork } from "typed-redux-saga";
import type { Saga } from "oxalis/model/sagas/effect-generators";
import { join, call, fork, FixedTask } from "typed-redux-saga";

/*
Given an array of async tasks, processTaskWithPool
Expand All @@ -10,12 +10,11 @@ export default function* processTaskWithPool(
tasks: Array<() => Saga<void>>,
poolSize: number,
): Saga<void> {
const startedTasks: Array<Task<void>> = [];
const startedTasks: Array<FixedTask<void>> = [];
let isFinalResolveScheduled = false;
let error: Error | null = null;

// @ts-expect-error ts-migrate(7006) FIXME: Parameter 'fn' implicitly has an 'any' type.
function* forkSafely(fn): Saga<void> {
function* forkSafely(fn: () => Saga<void>): Saga<void> {
// Errors from forked tasks cannot be caught, see https://redux-saga.js.org/docs/advanced/ForkModel/#error-propagation
// However, the task pool should not abort if a single task fails.
// Therefore, use this wrapper to safely execute all tasks and possibly rethrow the last error in the end.
Expand All @@ -32,17 +31,15 @@ export default function* processTaskWithPool(
isFinalResolveScheduled = true;
// All tasks were kicked off, which is why all tasks can be
// awaited now together.
// @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call.
yield* join(startedTasks);
if (error != null) throw error;
}

return;
}

const task = tasks.shift();
const task = tasks.shift() as () => Saga<void>;
const newTask = yield* fork(forkSafely, task);
// @ts-expect-error ts-migrate(2345) FIXME: Argument of type 'FixedTask<void>' is not assignab... Remove this comment to see the full error message
startedTasks.push(newTask);
// If that task is done, process a new one (that way,
// the pool size stays constant until the queue is almost empty.)
Expand Down
2 changes: 1 addition & 1 deletion frontend/javascripts/libs/worker_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export default class WorkerPool<P, R> {
//
// Example:
// const compressionPool = new WorkerPool(
// () => createWorker(ByteArrayToLz4Base64Worker),
// () => createWorker(ByteArraysToLz4Base64Worker),
// COMPRESSION_WORKER_COUNT,
// );
// const promise1 = compressionPool.submit(data1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ class DataCube {
}

triggerPushQueue() {
console.log("triggerPushQueue");
philippotto marked this conversation as resolved.
Show resolved Hide resolved
this.pushQueue.push();
}

Expand Down
58 changes: 29 additions & 29 deletions frontend/javascripts/oxalis/model/bucket_data_handling/pushqueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import { sendToStore } from "oxalis/model/bucket_data_handling/wkstore_adapter";
import AsyncTaskQueue from "libs/async_task_queue";
import type DataCube from "oxalis/model/bucket_data_handling/data_cube";
import Toast from "libs/toast";
import { createDebouncedAbortableParameterlessCallable } from "libs/debounced_abortable_saga";
import { call } from "redux-saga/effects";
export const COMPRESSING_BATCH_SIZE = 32;
// Only process the PushQueue after there was no user interaction
// for PUSH_DEBOUNCE_TIME milliseconds...
// Only process the PushQueue after there was no user interaction (or bucket modification due to
// downsampling) for PUSH_DEBOUNCE_TIME milliseconds...
const PUSH_DEBOUNCE_TIME = 1000;
// ...unless a timeout of PUSH_DEBOUNCE_MAX_WAIT_TIME milliseconds
// is exceeded. Then, initiate a push.
const PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000;
// todo: reactivate?
const _PUSH_DEBOUNCE_MAX_WAIT_TIME = 30000;

class PushQueue {
// @ts-expect-error ts-migrate(2564) FIXME: Property 'dataSetName' has no initializer and is n... Remove this comment to see the full error message
dataSetName: string;
cube: DataCube;
compressionTaskQueue: AsyncTaskQueue;
sendData: boolean;
Expand Down Expand Up @@ -65,7 +66,6 @@ class PushQueue {
this.pendingQueue.add(bucket);
bucket.dirtyCount++;
}

this.push();
}

Expand All @@ -77,41 +77,41 @@ class PushQueue {
this.pendingQueue.forEach((e) => console.log(e));
}

pushImpl = async () => {
await this.cube.temporalBucketManager.getAllLoadedPromise();
pushImpl = function* (this: PushQueue) {
try {
console.log("pushImpl start");
yield call(this.cube.temporalBucketManager.getAllLoadedPromise);

if (!this.sendData) {
return;
}
if (!this.sendData) {
return;
}

while (this.pendingQueue.size) {
let batchSize = Math.min(COMPRESSING_BATCH_SIZE, this.pendingQueue.size);
const batch: DataBucket[] = [];
console.log("this.pendingQueue.size", this.pendingQueue.size);

for (const bucket of this.pendingQueue) {
if (batchSize <= 0) break;
this.pendingQueue.delete(bucket);
batch.push(bucket);
batchSize--;
}
// Flush pendingQueue. Note that it's important to do this synchronously.
// If other actors could add to queue concurrently, the front-end could
// send an inconsistent state for a transaction.
const batch: DataBucket[] = Array.from(this.pendingQueue);
this.pendingQueue = new Set();

// fire and forget
this.compressionTaskQueue.scheduleTask(() => this.pushBatch(batch));
}

try {
// wait here
await this.compressionTaskQueue.join();
this.pushBatch(batch);
} catch (_error) {
// todo: somewhere else?
alert("We've encountered a permanent issue while saving. Please try to reload the page.");
}
console.log("pushImpl end");
philippotto marked this conversation as resolved.
Show resolved Hide resolved
};

push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, {
maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME,
});
// push = _.debounce(this.pushImpl, PUSH_DEBOUNCE_TIME, {
// maxWait: PUSH_DEBOUNCE_MAX_WAIT_TIME,
// });

// todo: prevent user from brushing for eternity?
push = createDebouncedAbortableParameterlessCallable(this.pushImpl, PUSH_DEBOUNCE_TIME, this);

pushBatch(batch: Array<DataBucket>): Promise<void> {
// The batch will be put into one transaction.
return sendToStore(batch, this.cube.layerName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class TemporalBucketManager {
return loadedPromise;
}

async getAllLoadedPromise(): Promise<void> {
getAllLoadedPromise = async () => {
await Promise.all(this.loadedPromises);
}
};
}

export default TemporalBucketManager;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { parseAsMaybe } from "libs/utils";
import { pushSaveQueueTransaction } from "oxalis/model/actions/save_actions";
import type { UpdateAction } from "oxalis/model/sagas/update_actions";
import { updateBucket } from "oxalis/model/sagas/update_actions";
import ByteArrayToLz4Base64Worker from "oxalis/workers/byte_array_to_lz4_base64.worker";
import ByteArraysToLz4Base64Worker from "oxalis/workers/byte_arrays_to_lz4_base64.worker";
import DecodeFourBitWorker from "oxalis/workers/decode_four_bit.worker";
import ErrorHandling from "libs/error_handling";
import Request from "libs/request";
Expand All @@ -25,13 +25,20 @@ import constants, { MappingStatusEnum } from "oxalis/constants";
import window from "libs/window";
import { getGlobalDataConnectionInfo } from "../data_connection_info";
import { ResolutionInfo } from "../helpers/resolution_info";
import _ from "lodash";

const decodeFourBit = createWorker(DecodeFourBitWorker);

// For 64-bit buckets with 32^3 voxel, a COMPRESSION_BATCH_SIZE of
// 128 corresponds to 16.8 MB that are sent to a webworker in one
// go.
const COMPRESSION_BATCH_SIZE = 128;
const COMPRESSION_WORKER_COUNT = 2;
const compressionPool = new WorkerPool(
() => createWorker(ByteArrayToLz4Base64Worker),
() => createWorker(ByteArraysToLz4Base64Worker),
COMPRESSION_WORKER_COUNT,
);

export const REQUEST_TIMEOUT = 60000;
export type SendBucketInfo = {
position: Vector3;
Expand Down Expand Up @@ -239,14 +246,25 @@ function sliceBufferIntoPieces(
}

export async function sendToStore(batch: Array<DataBucket>, tracingId: string): Promise<void> {
const items: Array<UpdateAction> = await Promise.all(
batch.map(async (bucket): Promise<UpdateAction> => {
const data = bucket.getCopyOfData();
const bucketInfo = createSendBucketInfo(bucket.zoomedAddress, bucket.cube.resolutionInfo);
const byteArray = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
const compressedBase64 = await compressionPool.submit(byteArray);
return updateBucket(bucketInfo, compressedBase64);
}),
const items: Array<UpdateAction> = _.flatten(
await Promise.all(
_.chunk(batch, COMPRESSION_BATCH_SIZE).map(async (batchSubset): Promise<UpdateAction[]> => {
const byteArrays = [];
for (const bucket of batchSubset) {
const data = bucket.getCopyOfData();
const byteArray = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
byteArrays.push(byteArray);
}

const compressedBase64Strings = await compressionPool.submit(byteArrays);
return compressedBase64Strings.map((compressedBase64, index) => {
const bucket = batchSubset[index];
const bucketInfo = createSendBucketInfo(bucket.zoomedAddress, bucket.cube.resolutionInfo);
return updateBucket(bucketInfo, compressedBase64);
});
}),
),
);

Store.dispatch(pushSaveQueueTransaction(items, "volume", tracingId));
}
2 changes: 2 additions & 0 deletions frontend/javascripts/oxalis/model/sagas/save_saga.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ let didShowFailedSimultaneousTracingError = false;
export function* sendRequestToServer(saveQueueType: SaveQueueType, tracingId: string): Saga<void> {
const fullSaveQueue = yield* select((state) => selectQueue(state, saveQueueType, tracingId));
const saveQueue = sliceAppropriateBatchCount(fullSaveQueue);
console.log("saving", saveQueue.length, "items out of", fullSaveQueue.length);
philippotto marked this conversation as resolved.
Show resolved Hide resolved
let compactedSaveQueue = compactSaveQueue(saveQueue);
const { version, type } = yield* select((state) =>
selectTracing(state, saveQueueType, tracingId),
Expand All @@ -162,6 +163,7 @@ export function* sendRequestToServer(saveQueueType: SaveQueueType, tracingId: st
compactedSaveQueue = addVersionNumbers(compactedSaveQueue, version);
let retryCount = 0;

// This while-loop only exists for the purpose of a retry-mechanism
while (true) {
let exceptionDuringMarkBucketsAsNotDirty = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ export const UNDO_HISTORY_SIZE = 20;
export const SETTINGS_RETRY_DELAY = 15 * 1000;
export const SETTINGS_MAX_RETRY_COUNT = 20; // 20 * 15s == 5m

export const maximumActionCountPerBatch = 5000;
export const maximumActionCountPerSave = 15000;
export const maximumActionCountPerBatch = 20;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: revert

export const maximumActionCountPerSave = 60;
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function upsampleVoxelMap(
]);

if (currentGoalBucket.type === "null") {
console.warn(warnAboutCouldNotCreate([...currentGoalBucketAddress, targetZoomStep]));
warnAboutCouldNotCreate([...currentGoalBucketAddress, targetZoomStep]);
continue;
}

Expand Down
Loading