Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If the ingest worker tries to update a non-existent ingest, retry #739

Merged
merged 13 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import uk.ac.wellcome.platform.storage.ingests_tracker.services.MessagingService
import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.{
IngestDoesNotExistError,
IngestTracker,
StateConflictError
StateConflictError,
UpdateNonExistentIngestError
}
import uk.ac.wellcome.storage.Identified
import uk.ac.wellcome.typesafe.Runnable
Expand Down Expand Up @@ -70,6 +71,9 @@ class IngestsTrackerApi[CallbackDestination, IngestsDestination](
info(s"Updated ingest: $ingest")
messagingService.send(ingest)
complete(StatusCodes.OK -> ingest)
case Left(UpdateNonExistentIngestError(e)) =>
error(s"Could not find ingest $id to update: $e")
complete(StatusCodes.NotFound)
case Left(e: StateConflictError) =>
error(s"Ingest $id can not be updated", e)
complete(StatusCodes.Conflict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class AkkaIngestTrackerClient(trackerHost: Uri)(implicit as: ActorSystem)
case StatusCodes.OK =>
info(f"OK for PATCH to $requestUri with $ingestUpdate")
Unmarshal(response.entity).to[Ingest].map(Right(_))
case StatusCodes.NotFound =>
warn(f"Not Found for PATCH to $requestUri with $ingestUpdate")
Future(Left(IngestTrackerUpdateNonExistentIngestError(ingestUpdate)))
case StatusCodes.Conflict =>
warn(f"Conflict for PATCH to $requestUri with $ingestUpdate")
Future(Left(IngestTrackerUpdateConflictError(ingestUpdate)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ sealed trait IngestTrackerUpdateError extends IngestTrackerError {
val ingestUpdate: IngestUpdate
}

case class IngestTrackerUpdateNonExistentIngestError(ingestUpdate: IngestUpdate)
extends IngestTrackerUpdateError

case class IngestTrackerUpdateConflictError(ingestUpdate: IngestUpdate)
extends IngestTrackerUpdateError

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import uk.ac.wellcome.platform.archive.common.bagit.models.BagVersion
import uk.ac.wellcome.platform.archive.common.ingests.models.{Callback, Ingest}
import uk.ac.wellcome.storage.{
NotFoundError,
StorageError,
UpdateNoSourceError,
VersionAlreadyExistsError
}
Expand Down Expand Up @@ -37,9 +38,9 @@ case class MismatchedVersionUpdateError(

case class NoCallbackOnIngestError() extends IngestStoreError

case class IngestStoreUnexpectedError(e: Throwable) extends IngestStoreError {

case class IngestStoreUnexpectedError(storageError: StorageError)
extends IngestStoreError {
override def toString: String = {
s"IngestStoreUnexpectedError: ${e.toString}"
Comment on lines -40 to -43
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discarding the outer wrapper meant the Throwable we got was sometimes null, which is unhelpful.

s"IngestStoreUnexpectedError: $storageError"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait IngestTracker extends Logging {
case Left(err: VersionAlreadyExistsError) =>
Left(IngestAlreadyExistsError(err))
case Left(err: StorageError) =>
Left(IngestStoreUnexpectedError(err.e))
Left(IngestStoreUnexpectedError(err))
}

def get(id: IngestID): Result =
Expand All @@ -28,7 +28,7 @@ trait IngestTracker extends Logging {
case Left(err: NotFoundError) =>
Left(IngestDoesNotExistError(err))
case Left(err: StorageError) =>
Left(IngestStoreUnexpectedError(err.e))
Left(IngestStoreUnexpectedError(err))
}

def update(update: IngestUpdate): Result = {
Expand All @@ -44,7 +44,7 @@ trait IngestTracker extends Logging {
case err: UpdateNoSourceError =>
UpdateNonExistentIngestError(err)
case err =>
IngestStoreUnexpectedError(err.e)
IngestStoreUnexpectedError(err)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
import grizzled.slf4j.Logging
import org.scanamo.auto._
import org.scanamo.time.JavaTimeFormats._
import uk.ac.wellcome.platform.archive.common.ingests.models.{Ingest, IngestID}
import uk.ac.wellcome.platform.archive.common.ingests.models.{
Ingest,
IngestID,
IngestUpdate
}
import uk.ac.wellcome.platform.archive.common.ingests.models.IngestID._
import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.IngestTracker
import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.{
IngestTracker,
UpdateNonExistentIngestError
}
import uk.ac.wellcome.storage.dynamo._
import uk.ac.wellcome.storage.store.VersionedStore
import uk.ac.wellcome.storage.store.dynamo.DynamoSingleVersionStore
Expand All @@ -18,4 +25,15 @@ class DynamoIngestTracker(config: DynamoConfig)(

override val underlying: VersionedStore[IngestID, Int, Ingest] =
new DynamoSingleVersionStore[IngestID, Ingest](config)

override def update(update: IngestUpdate): Result =
super.update(update) match {
case Left(UpdateNonExistentIngestError(err)) =>
warn(
s"DynamoDB could not find ingest ${update.id} to update. " +
"DynamoDB reads are eventually consistent and this may be fixed on retrying."
)
Left(UpdateNonExistentIngestError(err))
case result => result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,3 @@ import uk.ac.wellcome.storage.store.memory.MemoryVersionedStore
class MemoryIngestTracker(
val underlying: MemoryVersionedStore[IngestID, Ingest]
) extends IngestTracker

object MemoryIngestTracker {
def apply(): MemoryIngestTracker =
new MemoryIngestTracker(
underlying = MemoryVersionedStore[IngestID, Ingest](
initialEntries = Map.empty
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ class IngestsTrackerApiFeatureTest
withIngestsTrackerApi() {
case (callbackSender, ingestsSender, ingestTracker) =>
whenPatchRequestReady(path, ingestEventEntity) { response =>
it("responds with Conflict") {
response.status shouldBe StatusCodes.Conflict
it("responds with NotFound") {
response.status shouldBe StatusCodes.NotFound
}

it("does not create the Ingest") {
Expand All @@ -392,7 +392,6 @@ class IngestsTrackerApiFeatureTest
it("does not send a CallbackNotification message") {
callbackSender.getMessages[CallbackNotification] shouldBe empty
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ trait IngestTrackerClientTestCases
it("fails to apply a conflicting update") {
val initialIngest = createIngestWith(status = Succeeded)
val failedUpdate = createIngestStatusUpdateWith(
id = ingest.id,
id = initialIngest.id,
status = Ingest.Failed
)

Expand All @@ -157,6 +157,21 @@ trait IngestTrackerClientTestCases
}
}

it("errors if you apply an update to a non-existent ingest") {
val ingest = createIngest
val update = createIngestEventUpdate

withIngestsTracker(ingest) { _ =>
withIngestTrackerClient(trackerUri) { client =>
whenReady(client.updateIngest(update)) {
_.left.value shouldBe IngestTrackerUpdateNonExistentIngestError(
update
)
}
}
}
}

it("errors if the tracker API returns a 500 error") {
withBrokenIngestsTrackerApi { _ =>
withIngestTrackerClient(trackerUri) { client =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait IngestTrackerFixtures extends EitherValues with TimeTestFixture {
): MemoryIngestTracker = {
initialIngests
.map { ingest =>
store.init(ingest.id)(ingest)
store.init(ingest.id)(ingest) shouldBe a[Right[_, _]]
}

new MemoryIngestTracker(store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ import uk.ac.wellcome.platform.storage.ingests_tracker.services.{
}
import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.IngestStoreUnexpectedError
import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.memory.MemoryIngestTracker
import uk.ac.wellcome.storage.Version
import uk.ac.wellcome.storage.{
StoreReadError,
StoreWriteError,
UpdateWriteError,
Version
}
import uk.ac.wellcome.storage.maxima.memory.MemoryMaxima
import uk.ac.wellcome.storage.store.memory.{MemoryStore, MemoryVersionedStore}

Expand Down Expand Up @@ -68,13 +73,19 @@ trait IngestsTrackerApiFixture
)
) {
override def get(id: IngestID): Result =
Left(IngestStoreUnexpectedError(new Throwable("BOOM!")))
Left(IngestStoreUnexpectedError(StoreReadError(new Throwable("BOOM!"))))

override def init(ingest: Ingest): Result =
Left(IngestStoreUnexpectedError(new Throwable("BOOM!")))
Left(
IngestStoreUnexpectedError(StoreWriteError(new Throwable("BOOM!")))
)

override def update(update: IngestUpdate): Result =
Left(IngestStoreUnexpectedError(new Throwable("BOOM!")))
Left(
IngestStoreUnexpectedError(
UpdateWriteError(StoreWriteError(new Throwable("BOOM!")))
)
)
}

val callbackSender = new MemoryMessageSender()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DynamoIngestTrackerTest
override def withContext[R](
testWith: TestWith[DynamoTable, R]
): R =
withSpecifiedTable(createIngestTrackerTable) { ingestTrackerTable =>
withLocalDynamoDbTable { ingestTrackerTable =>
testWith(ingestTrackerTable)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import uk.ac.wellcome.platform.archive.common.ingests.models.{
import uk.ac.wellcome.platform.storage.ingests_tracker.client.{
IngestTrackerClient,
IngestTrackerUnknownUpdateError,
IngestTrackerUpdateConflictError
IngestTrackerUpdateConflictError,
IngestTrackerUpdateNonExistentIngestError
}
import uk.ac.wellcome.typesafe.Runnable

Expand Down Expand Up @@ -66,6 +67,14 @@ class IngestsWorkerService(
)
warn(err)
DeterministicFailure[Ingest](err)
// This may be caused by something like a consistency issue in DynamoDB;
// if we retry later the ingest may become available.
// See https://github.com/wellcomecollection/platform/issues/4781
case Left(IngestTrackerUpdateNonExistentIngestError(_)) =>
error(s"Could not apply $ingestUpdate to non-existent ingest")
NonDeterministicFailure[Ingest](
new Throwable(s"Could not apply $ingestUpdate to non-existent ingest")
)
case Left(IngestTrackerUnknownUpdateError(_, err)) =>
error(s"Error trying to apply $ingestUpdate, got UnknownError", err)
NonDeterministicFailure[Ingest](err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package uk.ac.wellcome.platform.storage.ingests_worker.services

import akka.http.scaladsl.model.Uri
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import uk.ac.wellcome.fixtures.TestWith
import uk.ac.wellcome.messaging.worker.models.{
DeterministicFailure,
NonDeterministicFailure,
Expand All @@ -11,6 +13,11 @@ import uk.ac.wellcome.messaging.worker.models.{
}
import uk.ac.wellcome.platform.archive.common.fixtures.HttpFixtures
import uk.ac.wellcome.platform.archive.common.ingests.models.Ingest
import uk.ac.wellcome.platform.storage.ingests_tracker.client.{
AkkaIngestTrackerClient,
IngestTrackerClient
}
import uk.ac.wellcome.platform.storage.ingests_tracker.fixtures.IngestsTrackerApiFixture
import uk.ac.wellcome.platform.storage.ingests_worker.fixtures.IngestsWorkerFixtures

class IngestsWorkerServiceTest
Expand All @@ -20,6 +27,7 @@ class IngestsWorkerServiceTest
with HttpFixtures
with ScalaFutures
with IngestsWorkerFixtures
with IngestsTrackerApiFixture
with IntegrationPatience {

val visibilityTimeout = 5
Expand Down Expand Up @@ -71,4 +79,26 @@ class IngestsWorkerServiceTest
}
}
}

it("updating a non-existent ingest is a non-deterministic failure") {
withIngestsTrackerApi(initialIngests = Seq.empty) { _ =>
withIngestTrackerClient(trackerUri) { client =>
withIngestWorker(ingestTrackerClient = client) { worker =>
whenReady(worker.processMessage(createIngestEventUpdate)) {
_ shouldBe a[NonDeterministicFailure[_]]
}
}
}
}
}

def withIngestTrackerClient[R](
trackerUri: String
)(testWith: TestWith[IngestTrackerClient, R]): R =
withActorSystem { implicit actorSystem =>
val client = new AkkaIngestTrackerClient(trackerHost = Uri(trackerUri))

testWith(client)
}

}