Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silently ignore duplicate tracing save requests #3767

Merged
merged 18 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions frontend/javascripts/libs/math.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// @noflow
// This module should be used to access the Math object, so it can be mocked in the unit tests
// mockRequire("libs/math", myFakeMath);

export default Math;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this file to uid_generator.js or something like that and then expose the request id generator, which can be mocked. This would limit the surface area of the mocked code a bit. Now, the mock for the save saga would need to be adapted as soon as we use more Math methods in the save saga (similar to how Math.min had to be re-added to the mock). Also, it suggests to always use this module instead of importing Math directly, but that's not really necessary in most cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's much better, thank you! :)

3 changes: 2 additions & 1 deletion frontend/javascripts/oxalis/model/reducers/save_reducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ function SaveReducer(state: OxalisState, action: Action): OxalisState {
[action.tracingType]: {
$push: [
{
// Placeholder, the version number will be updated before sending to the server
// Placeholder, the version number and requestId will be updated before sending to the server
version: -1,
requestId: "",
timestamp: Date.now(),
actions: items,
stats,
Expand Down
78 changes: 47 additions & 31 deletions frontend/javascripts/oxalis/model/sagas/save_saga.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import Request, { type RequestOptionsWithData } from "libs/request";
import Toast from "libs/toast";
import messages from "messages";
import window, { alert, document, location } from "libs/window";
import Math from "libs/math";

import { enforceSkeletonTracing } from "../accessors/skeletontracing_accessor";

Expand Down Expand Up @@ -162,10 +163,7 @@ function getRetryWaitTime(retryCount: number) {
return Math.min(2 ** retryCount * SAVE_RETRY_WAITING_TIME, MAX_SAVE_RETRY_WAITING_TIME);
}

export function* sendRequestToServer(
tracingType: "skeleton" | "volume",
retryCount: number = 0,
): Saga<void> {
export function* sendRequestToServer(tracingType: "skeleton" | "volume"): Saga<void> {
const fullSaveQueue = yield* select(state => state.save.queue[tracingType]);
const saveQueue = sliceAppropriateBatchCount(fullSaveQueue);

Expand All @@ -176,35 +174,46 @@ export function* sendRequestToServer(
const tracingStoreUrl = yield* select(state => state.tracing.tracingStore.url);
compactedSaveQueue = addVersionNumbers(compactedSaveQueue, version);

try {
yield* call(
sendRequestWithToken,
`${tracingStoreUrl}/tracings/${type}/${tracingId}/update?token=`,
{
method: "POST",
headers: { "X-Date": `${Date.now()}` },
data: compactedSaveQueue,
compress: true,
},
);
yield* put(setVersionNumberAction(version + compactedSaveQueue.length, tracingType));
yield* put(setLastSaveTimestampAction(tracingType));
yield* put(shiftSaveQueueAction(saveQueue.length, tracingType));
yield* call(toggleErrorHighlighting, false);
} catch (error) {
yield* call(toggleErrorHighlighting, true);
if (error.status === 409) {
// HTTP Code 409 'conflict' for dirty state
window.onbeforeunload = null;
yield* call(alert, messages["save.failed_simultaneous_tracing"]);
location.reload();
compactedSaveQueue = addRequestIds(
compactedSaveQueue,
Math.random()
.toString(36)
.substr(2, 10),
);

let retryCount = 0;
while (true) {
try {
yield* call(
sendRequestWithToken,
`${tracingStoreUrl}/tracings/${type}/${tracingId}/update?token=`,
{
method: "POST",
headers: { "X-Date": `${Date.now()}` },
data: compactedSaveQueue,
compress: true,
},
);
yield* put(setVersionNumberAction(version + compactedSaveQueue.length, tracingType));
yield* put(setLastSaveTimestampAction(tracingType));
yield* put(shiftSaveQueueAction(saveQueue.length, tracingType));
yield* call(toggleErrorHighlighting, false);
return;
} catch (error) {
yield* call(toggleErrorHighlighting, true);
if (error.status === 409) {
// HTTP Code 409 'conflict' for dirty state
window.onbeforeunload = null;
yield* call(alert, messages["save.failed_simultaneous_tracing"]);
location.reload();
return;
}
yield* race({
timeout: _call(delay, getRetryWaitTime(retryCount)),
forcePush: _take("SAVE_NOW"),
});
retryCount++;
}
yield* race({
timeout: _call(delay, getRetryWaitTime(retryCount)),
forcePush: _take("SAVE_NOW"),
});
yield* call(sendRequestToServer, tracingType, retryCount + 1);
}
}

Expand All @@ -226,6 +235,13 @@ export function addVersionNumbers(
return updateActionsBatches.map(batch => Object.assign({}, batch, { version: ++lastVersion }));
}

export function addRequestIds(
updateActionsBatches: Array<SaveQueueEntry>,
requestId: string,
): Array<SaveQueueEntry> {
return updateActionsBatches.map(batch => Object.assign({}, batch, { requestId }));
}

function removeUnrelevantUpdateActions(updateActions: Array<UpdateAction>) {
// This functions removes update actions that should not be sent to the server.
return updateActions.filter(ua => ua.name !== "toggleTree");
Expand Down
1 change: 1 addition & 0 deletions frontend/javascripts/oxalis/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export type SaveQueueEntry = {
actions: Array<UpdateAction>,
stats: ?SkeletonTracingStats,
info: string,
requestId: string,
};

export type ProgressInfo = {
Expand Down
1 change: 1 addition & 0 deletions frontend/javascripts/test/helpers/saveHelpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { UpdateAction } from "oxalis/model/sagas/update_actions";
export function createSaveQueueFromUpdateActions(updateActions, timestamp, stats = null) {
return updateActions.map(ua => ({
version: -1,
requestId: "",
timestamp,
stats,
actions: [].concat(ua),
Expand Down
47 changes: 33 additions & 14 deletions frontend/javascripts/test/sagas/save_saga.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ const DateMock = {
};
mockRequire("libs/date", DateMock);

const RANDOM_FLOAT = 0.1616828181890917;
const MathMock = {
random: () => RANDOM_FLOAT,
min: Math.min,
};
mockRequire("libs/math", MathMock);
const REQUEST_ID = RANDOM_FLOAT.toString(36).substr(2, 10);

mockRequire("oxalis/model/sagas/root_saga", function*() {
yield;
});
Expand All @@ -32,6 +40,7 @@ const {
sendRequestToServer,
toggleErrorHighlighting,
addVersionNumbers,
addRequestIds,
sendRequestWithToken,
} = mockRequire.reRequire("oxalis/model/sagas/save_saga");

Expand Down Expand Up @@ -120,7 +129,10 @@ test("SaveSaga should send request to server", t => {
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
const saveQueueWithVersions = addVersionNumbers(saveQueue, LAST_VERSION);
const saveQueueWithVersions = addRequestIds(
addVersionNumbers(saveQueue, LAST_VERSION),
REQUEST_ID,
);
expectValueDeepEqual(
t,
saga.next(TRACINGSTORE_URL),
Expand All @@ -138,28 +150,32 @@ test("SaveSaga should retry update actions", t => {
[UpdateActions.createEdge(1, 0, 1), UpdateActions.createEdge(1, 1, 2)],
TIMESTAMP,
);

const saga = sendRequestToServer(TRACING_TYPE);
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
const saveQueueWithVersions = addVersionNumbers(saveQueue, LAST_VERSION);
expectValueDeepEqual(
t,
saga.next(TRACINGSTORE_URL),
call(sendRequestWithToken, `${TRACINGSTORE_URL}/tracings/skeleton/1234567890/update?token=`, {
const saveQueueWithVersions = addRequestIds(
addVersionNumbers(saveQueue, LAST_VERSION),
REQUEST_ID,
);
const requestWithTokenCall = call(
sendRequestWithToken,
`${TRACINGSTORE_URL}/tracings/skeleton/1234567890/update?token=`,
{
method: "POST",
headers: { "X-Date": `${TIMESTAMP}` },
data: saveQueueWithVersions,
compress: true,
}),
},
);

const saga = sendRequestToServer(TRACING_TYPE);
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
expectValueDeepEqual(t, saga.next(TRACINGSTORE_URL), requestWithTokenCall);

expectValueDeepEqual(t, saga.throw("Timeout"), call(toggleErrorHighlighting, true));
// wait for retry
saga.next();
// should retry
expectValueDeepEqual(t, saga.next(), call(sendRequestToServer, TRACING_TYPE, 1));
expectValueDeepEqual(t, saga.next(), requestWithTokenCall);
});

test("SaveSaga should escalate on permanent client error update actions", t => {
Expand All @@ -172,7 +188,10 @@ test("SaveSaga should escalate on permanent client error update actions", t => {
saga.next();
saga.next(saveQueue);
saga.next({ version: LAST_VERSION, type: TRACING_TYPE, tracingId: "1234567890" });
const saveQueueWithVersions = addVersionNumbers(saveQueue, LAST_VERSION);
const saveQueueWithVersions = addRequestIds(
addVersionNumbers(saveQueue, LAST_VERSION),
REQUEST_ID,
);
expectValueDeepEqual(
t,
saga.next(TRACINGSTORE_URL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ class TemporaryStore[K, V] @Inject()(system: ActorSystem) {
map.get(id)
}

def contains(id: K) =
map.synchronized(
map.contains(id)
)

def findAll =
map.synchronized {
map.values.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.scalableminds.webknossos.tracingstore.tracings.TracingSelector
import com.scalableminds.webknossos.tracingstore.tracings.skeleton._
import com.scalableminds.util.tools.JsonHelper.boxFormat
import com.scalableminds.util.tools.JsonHelper.optionFormat
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import play.api.i18n.Messages
import play.api.libs.json.Json
import play.api.mvc.PlayBodyParsers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import com.scalableminds.webknossos.tracingstore.{TracingStoreAccessTokenService
import com.scalableminds.webknossos.tracingstore.tracings.{TracingSelector, TracingService, UpdateAction, UpdateActionGroup}
import com.scalableminds.util.tools.JsonHelper.boxFormat
import com.scalableminds.util.tools.JsonHelper.optionFormat
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import net.liftweb.common.Failure
import play.api.i18n.Messages
import scala.concurrent.duration._
import play.api.libs.json.{Json, Reads}
import play.api.mvc.PlayBodyParsers
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
Expand Down Expand Up @@ -110,11 +112,18 @@ trait TracingController[T <: GeneratedMessage with Message[T],
val userToken = request.getQueryString("token")
webKnossosServer.reportTracingUpdates(tracingId, timestamps, latestStatistics, userToken).flatMap { _ =>
updateGroups.foldLeft(currentVersion) { (previousVersion, updateGroup) =>
previousVersion.flatMap { version =>
if (version + 1 == updateGroup.version || freezeVersions) {
tracingService.handleUpdateGroup(tracingId, updateGroup, version).map(_ => if (freezeVersions) version else updateGroup.version)
previousVersion.flatMap { prevVersion =>
if (prevVersion + 1 == updateGroup.version || freezeVersions) {
tracingService.handleUpdateGroup(tracingId, updateGroup, prevVersion)
.map(_ => Fox.successful(tracingService.saveToHandledGroupCache(tracingId, updateGroup.version, updateGroup.requestId)))
.map(_ => if (freezeVersions) prevVersion else updateGroup.version)
} else {
Failure(s"Incorrect version. Expected: ${version + 1}; Got: ${updateGroup.version}") ~> CONFLICT
if ( updateGroup.requestId.exists(requestId => tracingService.handledGroupCacheContains(requestId, tracingId, updateGroup.version))) {
//this update group was received and successfully saved in a previous request. silently ignore this duplicate request
Fox.successful(if (freezeVersions) prevVersion else updateGroup.version)
} else {
Failure(s"Incorrect version. Expected: ${prevVersion + 1}; Got: ${updateGroup.version}") ~> CONFLICT
}
}
}
}
Expand All @@ -123,4 +132,5 @@ trait TracingController[T <: GeneratedMessage with Message[T],
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.scalableminds.webknossos.tracingstore.tracings._
import com.scalableminds.webknossos.tracingstore.tracings.volume.VolumeTracingService
import com.scalableminds.util.tools.JsonHelper.boxFormat
import com.scalableminds.util.tools.JsonHelper.optionFormat
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import play.api.i18n.Messages
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.streams.IterateeStreams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.scalableminds.webknossos.tracingstore.tracings
import java.util.UUID

import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.typesafe.scalalogging.LazyLogging
import play.api.libs.json.Reads
Expand All @@ -12,12 +13,16 @@ import scala.concurrent.duration._

trait TracingService[T <: GeneratedMessage with Message[T]] extends KeyValueStoreImplicits with FoxImplicits with LazyLogging {

val handledGroupCacheExpiry: FiniteDuration = 5 minutes

def tracingType: TracingType.Value

def tracingStore: FossilDBClient

def temporaryTracingStore: TemporaryTracingStore[T]

val handledGroupCache: TemporaryStore[(String, String, Long), Unit]

implicit def tracingCompanion: GeneratedMessageCompanion[T]

implicit val updateActionReads: Reads[UpdateAction[T]]
Expand Down Expand Up @@ -66,4 +71,13 @@ trait TracingService[T <: GeneratedMessage with Message[T]] extends KeyValueStor
tracingStore.put(id, version, tracing).map(_ => id)
}
}

def saveToHandledGroupCache(tracingId: String, version: Long, requestIdOpt: Option[String]): Unit = {
requestIdOpt.foreach { requestId =>
handledGroupCache.insert((requestId, tracingId, version), (), Some(handledGroupCacheExpiry))
}
}

def handledGroupCacheContains(requestId: String, tracingId: String, version: Long) =
handledGroupCache.contains(requestId, tracingId, version)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ case class UpdateActionGroup[T <: GeneratedMessage with Message[T]](
timestamp: Long,
actions: List[UpdateAction[T]],
stats: Option[JsObject],
info: Option[String])
info: Option[String],
requestId: Option[String]
)

object UpdateActionGroup {

Expand All @@ -41,8 +43,9 @@ object UpdateActionGroup {
actions <- json.validate((JsPath \ "actions").read[List[UpdateAction[T]]])
stats <- json.validate((JsPath \ "stats").readNullable[JsObject])
info <- json.validate((JsPath \ "info").readNullable[String])
id <- json.validate((JsPath \ "requestId").readNullable[String])
} yield {
UpdateActionGroup[T](version, timestamp, actions, stats, info)
UpdateActionGroup[T](version, timestamp, actions, stats, info, id)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.scalableminds.webknossos.tracingstore.tracings.skeleton
import com.google.inject.Inject
import com.scalableminds.util.geometry.BoundingBox
import com.scalableminds.util.tools.{Fox, FoxImplicits, TextUtils}
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import com.scalableminds.webknossos.tracingstore.SkeletonTracing.SkeletonTracing
import com.scalableminds.webknossos.tracingstore.tracings.UpdateAction.SkeletonUpdateAction
import com.scalableminds.webknossos.tracingstore.tracings._
Expand All @@ -13,7 +14,8 @@ import play.api.libs.json.{JsObject, Json, Writes}
import scala.concurrent.ExecutionContext

class SkeletonTracingService @Inject()(tracingDataStore: TracingDataStore,
val temporaryTracingStore: TemporaryTracingStore[SkeletonTracing])
val temporaryTracingStore: TemporaryTracingStore[SkeletonTracing],
val handledGroupCache: TemporaryStore[(String, String, Long), Unit])
(implicit ec: ExecutionContext)
extends TracingService[SkeletonTracing]
with KeyValueStoreImplicits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.scalableminds.webknossos.datastore.models.DataRequestCollection.DataRequestCollection
import com.scalableminds.webknossos.datastore.models.requests.DataServiceDataRequest
import com.scalableminds.webknossos.datastore.services.BinaryDataService
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import com.scalableminds.webknossos.tracingstore.TracingStoreConfig
import com.scalableminds.webknossos.wrap.WKWFile
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -30,6 +31,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
class VolumeTracingService @Inject()(
tracingDataStore: TracingDataStore,
config: TracingStoreConfig,
val handledGroupCache: TemporaryStore[(String, String, Long), Unit],
val temporaryTracingStore: TemporaryTracingStore[VolumeTracing]
)
extends TracingService[VolumeTracing]
Expand Down