diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApi.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApi.scala index 4bf4560a90..183f9270f1 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApi.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApi.scala @@ -17,7 +17,8 @@ import uk.ac.wellcome.platform.storage.ingests_tracker.services.MessagingService import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.{ IngestDoesNotExistError, IngestTracker, - StateConflictError + StateConflictError, + UpdateNonExistentIngestError } import uk.ac.wellcome.storage.Identified import uk.ac.wellcome.typesafe.Runnable @@ -70,6 +71,9 @@ class IngestsTrackerApi[CallbackDestination, IngestsDestination]( info(s"Updated ingest: $ingest") messagingService.send(ingest) complete(StatusCodes.OK -> ingest) + case Left(UpdateNonExistentIngestError(e)) => + error(s"Could not find ingest $id to update: $e") + complete(StatusCodes.NotFound) case Left(e: StateConflictError) => error(s"Ingest $id can not be updated", e) complete(StatusCodes.Conflict) diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClient.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClient.scala index 0724fa3522..40356003dd 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClient.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClient.scala @@ -90,6 +90,9 @@ class AkkaIngestTrackerClient(trackerHost: Uri)(implicit as: ActorSystem) case StatusCodes.OK => info(f"OK for PATCH to $requestUri with $ingestUpdate") Unmarshal(response.entity).to[Ingest].map(Right(_)) + case StatusCodes.NotFound => + warn(f"Not Found for PATCH to $requestUri with $ingestUpdate") + Future(Left(IngestTrackerUpdateNonExistentIngestError(ingestUpdate))) case StatusCodes.Conflict => warn(f"Conflict for PATCH to $requestUri with $ingestUpdate") Future(Left(IngestTrackerUpdateConflictError(ingestUpdate))) diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerError.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerError.scala index 57e07a25ec..99f4d92de0 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerError.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerError.scala @@ -22,6 +22,9 @@ sealed trait IngestTrackerUpdateError extends IngestTrackerError { val ingestUpdate: IngestUpdate } +case class IngestTrackerUpdateNonExistentIngestError(ingestUpdate: IngestUpdate) + extends IngestTrackerUpdateError + case class IngestTrackerUpdateConflictError(ingestUpdate: IngestUpdate) extends IngestTrackerUpdateError diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestError.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestError.scala index 4e365a64e7..e8a500f300 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestError.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestError.scala @@ -4,6 +4,7 @@ import uk.ac.wellcome.platform.archive.common.bagit.models.BagVersion import uk.ac.wellcome.platform.archive.common.ingests.models.{Callback, Ingest} import uk.ac.wellcome.storage.{ NotFoundError, + StorageError, UpdateNoSourceError, VersionAlreadyExistsError } @@ -37,9 +38,9 @@ case class MismatchedVersionUpdateError( case class NoCallbackOnIngestError() extends IngestStoreError -case class IngestStoreUnexpectedError(e: Throwable) extends IngestStoreError { - +case class IngestStoreUnexpectedError(storageError: StorageError) + extends IngestStoreError { override def toString: String = { - s"IngestStoreUnexpectedError: ${e.toString}" + s"IngestStoreUnexpectedError: $storageError" } } diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestTracker.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestTracker.scala index f3bae1e2b2..fa8e94a66f 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestTracker.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/IngestTracker.scala @@ -18,7 +18,7 @@ trait IngestTracker extends Logging { case Left(err: VersionAlreadyExistsError) => Left(IngestAlreadyExistsError(err)) case Left(err: StorageError) => - Left(IngestStoreUnexpectedError(err.e)) + Left(IngestStoreUnexpectedError(err)) } def get(id: IngestID): Result = @@ -28,7 +28,7 @@ trait IngestTracker extends Logging { case Left(err: NotFoundError) => Left(IngestDoesNotExistError(err)) case Left(err: StorageError) => - Left(IngestStoreUnexpectedError(err.e)) + Left(IngestStoreUnexpectedError(err)) } def update(update: IngestUpdate): Result = { @@ -44,7 +44,7 @@ trait IngestTracker extends Logging { case err: UpdateNoSourceError => UpdateNonExistentIngestError(err) case err => - IngestStoreUnexpectedError(err.e) + IngestStoreUnexpectedError(err) } } } diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTracker.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTracker.scala index 62e756da09..31391e8ef6 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTracker.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTracker.scala @@ -4,9 +4,16 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB import grizzled.slf4j.Logging import org.scanamo.auto._ import org.scanamo.time.JavaTimeFormats._ -import uk.ac.wellcome.platform.archive.common.ingests.models.{Ingest, IngestID} +import uk.ac.wellcome.platform.archive.common.ingests.models.{ + Ingest, + IngestID, + IngestUpdate +} import uk.ac.wellcome.platform.archive.common.ingests.models.IngestID._ -import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.IngestTracker +import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.{ + IngestTracker, + UpdateNonExistentIngestError +} import uk.ac.wellcome.storage.dynamo._ import uk.ac.wellcome.storage.store.VersionedStore import uk.ac.wellcome.storage.store.dynamo.DynamoSingleVersionStore @@ -18,4 +25,15 @@ class DynamoIngestTracker(config: DynamoConfig)( override val underlying: VersionedStore[IngestID, Int, Ingest] = new DynamoSingleVersionStore[IngestID, Ingest](config) + + override def update(update: IngestUpdate): Result = + super.update(update) match { + case Left(UpdateNonExistentIngestError(err)) => + warn( + s"DynamoDB could not find ingest ${update.id} to update. " + + "DynamoDB reads are eventually consistent and this may be fixed on retrying." + ) + Left(UpdateNonExistentIngestError(err)) + case result => result + } } diff --git a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/memory/MemoryIngestTracker.scala b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/memory/MemoryIngestTracker.scala index 95588e9377..fe52e26881 100644 --- a/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/memory/MemoryIngestTracker.scala +++ b/ingests/ingests_tracker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/memory/MemoryIngestTracker.scala @@ -7,12 +7,3 @@ import uk.ac.wellcome.storage.store.memory.MemoryVersionedStore class MemoryIngestTracker( val underlying: MemoryVersionedStore[IngestID, Ingest] ) extends IngestTracker - -object MemoryIngestTracker { - def apply(): MemoryIngestTracker = - new MemoryIngestTracker( - underlying = MemoryVersionedStore[IngestID, Ingest]( - initialEntries = Map.empty - ) - ) -} diff --git a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApiFeatureTest.scala b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApiFeatureTest.scala index d2aabcd1a6..8976f6b0c5 100644 --- a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApiFeatureTest.scala +++ b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/IngestsTrackerApiFeatureTest.scala @@ -374,8 +374,8 @@ class IngestsTrackerApiFeatureTest withIngestsTrackerApi() { case (callbackSender, ingestsSender, ingestTracker) => whenPatchRequestReady(path, ingestEventEntity) { response => - it("responds with Conflict") { - response.status shouldBe StatusCodes.Conflict + it("responds with NotFound") { + response.status shouldBe StatusCodes.NotFound } it("does not create the Ingest") { @@ -392,7 +392,6 @@ class IngestsTrackerApiFeatureTest it("does not send a CallbackNotification message") { callbackSender.getMessages[CallbackNotification] shouldBe empty } - } } } diff --git a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClientTest.scala b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClientTest.scala index dc1b6da725..ddefe93499 100644 --- a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClientTest.scala +++ b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/client/IngestTrackerClientTest.scala @@ -135,7 +135,7 @@ trait IngestTrackerClientTestCases it("fails to apply a conflicting update") { val initialIngest = createIngestWith(status = Succeeded) val failedUpdate = createIngestStatusUpdateWith( - id = ingest.id, + id = initialIngest.id, status = Ingest.Failed ) @@ -157,6 +157,21 @@ trait IngestTrackerClientTestCases } } + it("errors if you apply an update to a non-existent ingest") { + val ingest = createIngest + val update = createIngestEventUpdate + + withIngestsTracker(ingest) { _ => + withIngestTrackerClient(trackerUri) { client => + whenReady(client.updateIngest(update)) { + _.left.value shouldBe IngestTrackerUpdateNonExistentIngestError( + update + ) + } + } + } + } + it("errors if the tracker API returns a 500 error") { withBrokenIngestsTrackerApi { _ => withIngestTrackerClient(trackerUri) { client => diff --git a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestTrackerFixtures.scala b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestTrackerFixtures.scala index cf0bfada7c..9768cb3bc7 100644 --- a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestTrackerFixtures.scala +++ b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestTrackerFixtures.scala @@ -64,7 +64,7 @@ trait IngestTrackerFixtures extends EitherValues with TimeTestFixture { ): MemoryIngestTracker = { initialIngests .map { ingest => - store.init(ingest.id)(ingest) + store.init(ingest.id)(ingest) shouldBe a[Right[_, _]] } new MemoryIngestTracker(store) diff --git a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestsTrackerApiFixture.scala b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestsTrackerApiFixture.scala index a11327ef84..c82e0e334b 100644 --- a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestsTrackerApiFixture.scala +++ b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/fixtures/IngestsTrackerApiFixture.scala @@ -16,7 +16,12 @@ import uk.ac.wellcome.platform.storage.ingests_tracker.services.{ } import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.IngestStoreUnexpectedError import uk.ac.wellcome.platform.storage.ingests_tracker.tracker.memory.MemoryIngestTracker -import uk.ac.wellcome.storage.Version +import uk.ac.wellcome.storage.{ + StoreReadError, + StoreWriteError, + UpdateWriteError, + Version +} import uk.ac.wellcome.storage.maxima.memory.MemoryMaxima import uk.ac.wellcome.storage.store.memory.{MemoryStore, MemoryVersionedStore} @@ -68,13 +73,19 @@ trait IngestsTrackerApiFixture ) ) { override def get(id: IngestID): Result = - Left(IngestStoreUnexpectedError(new Throwable("BOOM!"))) + Left(IngestStoreUnexpectedError(StoreReadError(new Throwable("BOOM!")))) override def init(ingest: Ingest): Result = - Left(IngestStoreUnexpectedError(new Throwable("BOOM!"))) + Left( + IngestStoreUnexpectedError(StoreWriteError(new Throwable("BOOM!"))) + ) override def update(update: IngestUpdate): Result = - Left(IngestStoreUnexpectedError(new Throwable("BOOM!"))) + Left( + IngestStoreUnexpectedError( + UpdateWriteError(StoreWriteError(new Throwable("BOOM!"))) + ) + ) } val callbackSender = new MemoryMessageSender() diff --git a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTrackerTest.scala b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTrackerTest.scala index b2410dc789..917cf2fe21 100644 --- a/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTrackerTest.scala +++ b/ingests/ingests_tracker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_tracker/tracker/dynamo/DynamoIngestTrackerTest.scala @@ -37,7 +37,7 @@ class DynamoIngestTrackerTest override def withContext[R]( testWith: TestWith[DynamoTable, R] ): R = - withSpecifiedTable(createIngestTrackerTable) { ingestTrackerTable => + withLocalDynamoDbTable { ingestTrackerTable => testWith(ingestTrackerTable) } diff --git a/ingests/ingests_worker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerService.scala b/ingests/ingests_worker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerService.scala index 12f7d1574f..453bc3da40 100644 --- a/ingests/ingests_worker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerService.scala +++ b/ingests/ingests_worker/src/main/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerService.scala @@ -27,7 +27,8 @@ import uk.ac.wellcome.platform.archive.common.ingests.models.{ import uk.ac.wellcome.platform.storage.ingests_tracker.client.{ IngestTrackerClient, IngestTrackerUnknownUpdateError, - IngestTrackerUpdateConflictError + IngestTrackerUpdateConflictError, + IngestTrackerUpdateNonExistentIngestError } import uk.ac.wellcome.typesafe.Runnable @@ -66,6 +67,14 @@ class IngestsWorkerService( ) warn(err) DeterministicFailure[Ingest](err) + // This may be caused by something like a consistency issue in DynamoDB; + // if we retry later the ingest may become available. + // See https://github.com/wellcomecollection/platform/issues/4781 + case Left(IngestTrackerUpdateNonExistentIngestError(_)) => + error(s"Could not apply $ingestUpdate to non-existent ingest") + NonDeterministicFailure[Ingest]( + new Throwable(s"Could not apply $ingestUpdate to non-existent ingest") + ) case Left(IngestTrackerUnknownUpdateError(_, err)) => error(s"Error trying to apply $ingestUpdate, got UnknownError", err) NonDeterministicFailure[Ingest](err) diff --git a/ingests/ingests_worker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerServiceTest.scala b/ingests/ingests_worker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerServiceTest.scala index 42b68741d5..fdf1572ca7 100644 --- a/ingests/ingests_worker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerServiceTest.scala +++ b/ingests/ingests_worker/src/test/scala/uk/ac/wellcome/platform/storage/ingests_worker/services/IngestsWorkerServiceTest.scala @@ -1,8 +1,10 @@ package uk.ac.wellcome.platform.storage.ingests_worker.services +import akka.http.scaladsl.model.Uri import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers +import uk.ac.wellcome.fixtures.TestWith import uk.ac.wellcome.messaging.worker.models.{ DeterministicFailure, NonDeterministicFailure, @@ -11,6 +13,11 @@ import uk.ac.wellcome.messaging.worker.models.{ } import uk.ac.wellcome.platform.archive.common.fixtures.HttpFixtures import uk.ac.wellcome.platform.archive.common.ingests.models.Ingest +import uk.ac.wellcome.platform.storage.ingests_tracker.client.{ + AkkaIngestTrackerClient, + IngestTrackerClient +} +import uk.ac.wellcome.platform.storage.ingests_tracker.fixtures.IngestsTrackerApiFixture import uk.ac.wellcome.platform.storage.ingests_worker.fixtures.IngestsWorkerFixtures class IngestsWorkerServiceTest @@ -20,6 +27,7 @@ class IngestsWorkerServiceTest with HttpFixtures with ScalaFutures with IngestsWorkerFixtures + with IngestsTrackerApiFixture with IntegrationPatience { val visibilityTimeout = 5 @@ -71,4 +79,26 @@ class IngestsWorkerServiceTest } } } + + it("updating a non-existent ingest is a non-deterministic failure") { + withIngestsTrackerApi(initialIngests = Seq.empty) { _ => + withIngestTrackerClient(trackerUri) { client => + withIngestWorker(ingestTrackerClient = client) { worker => + whenReady(worker.processMessage(createIngestEventUpdate)) { + _ shouldBe a[NonDeterministicFailure[_]] + } + } + } + } + } + + def withIngestTrackerClient[R]( + trackerUri: String + )(testWith: TestWith[IngestTrackerClient, R]): R = + withActorSystem { implicit actorSystem => + val client = new AkkaIngestTrackerClient(trackerHost = Uri(trackerUri)) + + testWith(client) + } + }