From 026d039056a9382d3cd05effe9560119e598bf05 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Tue, 19 Mar 2024 17:44:11 +0100 Subject: [PATCH 01/24] Add ResourceProcessor --- .../ch/epfl/bluebrain/nexus/ship/Main.scala | 4 +- .../ship/resources/ResourceProcessor.scala | 109 ++++++++++++++++++ ship/src/test/resources/import/import.json | 1 + .../epfl/bluebrain/nexus/ship/MainSuite.scala | 2 + 4 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index c353f4d64f..0ea306859a 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -17,6 +17,7 @@ import ch.epfl.bluebrain.nexus.ship.model.InputEvent import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor +import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor import com.monovore.decline.Opts import com.monovore.decline.effect.CommandIOApp import fs2.Stream @@ -80,7 +81,8 @@ object Main fetchActiveOrg = FetchActiveOrganization(xas) projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) - report <- EventProcessor.run(events, projectProcessor, resolverProcessor) + resourceProcessor <- ResourceProcessor(eventLogConfig, fetchContext, xas) + report <- EventProcessor.run(events, projectProcessor, resolverProcessor, resourceProcessor) } yield report } } yield report diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala new file mode 100644 index 0000000000..ca8d1d1151 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala @@ -0,0 +1,109 @@ +package ch.epfl.bluebrain.nexus.ship.resources + +import cats.effect.IO +import cats.implicits.catsSyntaxOptionId +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller +import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdAssembly +import ch.epfl.bluebrain.nexus.delta.sdk.model.{IdSegment, IdSegmentRef, ResourceF} +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.resources.ValidationResult.NoValidation +import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceEvent +import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceEvent._ +import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{IncorrectRev, ResourceAlreadyExists} +import ch.epfl.bluebrain.nexus.delta.sdk.resources._ +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor.logger +import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus} +import io.circe.Decoder + +class ResourceProcessor private (resources: Resources, clock: EventClock) extends EventProcessor[ResourceEvent] { + + override def resourceType: EntityType = Resources.entityType + + override def decoder: Decoder[ResourceEvent] = ResourceEvent.serializer.codec + + override def evaluate(event: ResourceEvent): IO[ImportStatus] = + for { + _ <- clock.setInstant(event.instant) + result <- evaluateInternal(event) + } yield result + + private def evaluateInternal(event: ResourceEvent): IO[ImportStatus] = { + implicit val s: Subject = event.subject + implicit val c: Caller = Caller(s, Set.empty) + val cRev = event.rev - 1 + + implicit class ResourceRefOps(ref: ResourceRef) { + def toIdSegment: IdSegment = IdSegmentRef(ref).value + } + + event match { + case e: ResourceCreated => + resources.create(e.id, e.project, e.schema.toIdSegment, e.source, e.tag) + case e: ResourceUpdated => + resources.update(e.id, event.project, e.schema.toIdSegment.some, cRev, e.source, e.tag) + case e: ResourceSchemaUpdated => + resources.updateAttachedSchema(e.id, e.project, e.schema.toIdSegment) + case e: ResourceRefreshed => + resources.refresh(e.id, e.project, e.schema.toIdSegment.some) + case e: ResourceTagAdded => // todo deal with schema + resources.tag(e.id, e.project, None, e.tag, e.targetRev, cRev) + case e: ResourceTagDeleted => // todo deal with schema + resources.deleteTag(e.id, e.project, None, e.tag, cRev) + case e: ResourceDeprecated => // todo deal with schema + resources.deprecate(e.id, e.project, None, cRev) + case e: ResourceUndeprecated => // todo deal with schema + resources.undeprecate(e.id, e.project, None, cRev) + } + }.redeemWith( + { + case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped) + case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided").as(ImportStatus.Dropped) + case other => IO.raiseError(other) + }, + _ => IO.pure(ImportStatus.Success) + ) + +} + +object ResourceProcessor { + + private val logger = Logger[ResourceProcessor] + + def apply( + eventLogConfig: EventLogConfig, + fetchContext: FetchContext, + xas: Transactors + )(implicit jsonLdApi: JsonLdApi): IO[ResourceProcessor] = + EventClock.init().map { clock => + implicit val uuidF: UUIDF = FailingUUID + + val detectChange = DetectChange(false) + + val validate = new ValidateResource { + override def apply(jsonld: JsonLdAssembly, schema: SchemaClaim, enforceSchema: Boolean): IO[ValidationResult] = + IO.pure(NoValidation(ProjectRef.unsafe("org", "proj"))) + override def apply(jsonld: JsonLdAssembly, schema: ResourceF[Schema]): IO[ValidationResult] = + IO.pure(NoValidation(ProjectRef.unsafe("org", "proj"))) + } + + val resourceDef = Resources.definition(validate, detectChange, clock) + val resourceLog = ScopedEventLog(resourceDef, eventLogConfig, xas) + + val resources = ResourcesImpl( + resourceLog, + fetchContext, + ResolverContextResolution.never + ) + new ResourceProcessor(resources, clock) + } + +} diff --git a/ship/src/test/resources/import/import.json b/ship/src/test/resources/import/import.json index 88f755d36c..b3d6cf7c17 100644 --- a/ship/src/test/resources/import/import.json +++ b/ship/src/test/resources/import/import.json @@ -4,6 +4,7 @@ {"ordering":2173453,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject1","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject1", "rev": 2, "tpe": "CrossProject", "@type": "ResolverDeprecated", "instant": "2020-01-28T08:23:36.270Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-01-28T09:23:36.27+01:00"} {"ordering":2408475,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":1,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 1, "@type": "ResolverCreated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "ProvidedIdentities", "value": [{"@type": "Authenticated", "realm": "bbp"}]}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["CrossProject", "Resolver"], "@context": "https://bluebrain.github.io/nexus/contexts/resolvers.json", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "identities": [{"@id": "https://bbp.epfl.ch/nexus/v1/realms/bbp/authenticated", "@type": "Authenticated", "realm": "bbp"}]}, "instant": "2020-03-09T13:36:19.246Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-03-09T14:36:19.246+01:00"} {"ordering":4879496,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 2, "@type": "ResolverUpdated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "UseCurrentCaller"}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["Resolver", "CrossProject"], "@context": ["https://bluebrain.github.io/nexus/contexts/resolvers.json", "https://bluebrain.github.io/nexus/contexts/metadata.json"], "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "useCurrentCaller": true}, "instant": "2022-11-16T13:42:07.498Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2022-11-16T14:42:07.498+01:00"} +{"ordering":4900000,"type":"resource","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":1,"value":{"id":"https://staging.nise.bbp.epfl.ch/nexus/v1/resources/tests/oliver/_/6285253a-0ed4-4d98-8821-fe030c3fef40","rev":1,"@type":"ResourceCreated","types":[],"schema":"https://bluebrain.github.io/nexus/schemas/unconstrained.json?rev=1","source":{"hello":"world"},"instant":"2024-03-15T17:20:08.294513Z","project":"public/sscx","subject":{"@type":"User","realm":"bbp","subject":"grabinsk"},"expanded":[{"@id":"https://staging.nise.bbp.epfl.ch/nexus/v1/resources/tests/oliver/_/6285253a-0ed4-4d98-8821-fe030c3fef40","https://staging.nise.bbp.epfl.ch/nexus/v1/vocabs/tests/oliver/hello":[{"@value":"world"}]}],"compacted":{"@id":"6285253a-0ed4-4d98-8821-fe030c3fef40","hello":"world","@context":{"@base":"https://staging.nise.bbp.epfl.ch/nexus/v1/resources/tests/oliver/_/","@vocab":"https://staging.nise.bbp.epfl.ch/nexus/v1/vocabs/tests/oliver/"}},"schemaProject":"tests/oliver","remoteContexts":[]},"instant":"2099-12-30T23:59:59.999+01:00"} {"ordering":5300965,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":2,"value":{"rev": 2, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-07-16T18:42:59.530Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-07-16T20:42:59.53+02:00"} {"ordering":5318566,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":3,"value":{"rev": 3, "base": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-07-21T13:55:02.463Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-07-21T15:55:02.463+02:00"} {"ordering":5418473,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":4,"value":{"rev": 4, "base": "https://bbp.epfl.ch/data/public/sscx/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-08-22T15:05:13.654Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-08-22T17:05:13.654+02:00"} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala index 1bbcf4f93c..27975d46da 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala @@ -5,6 +5,7 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers +import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{PostgresPassword, PostgresUser} @@ -29,6 +30,7 @@ class MainSuite extends NexusSuite with MainSuite.Fixture { Map( Projects.entityType -> Count(5L, 0L), Resolvers.entityType -> Count(5L, 0L), + Resources.entityType -> Count(1L, 0L), EntityType("xxx") -> Count(0L, 1L) ) ) From 798503f2515129254d95ad9a2d2098ce08d208c4 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Wed, 20 Mar 2024 09:26:21 +0100 Subject: [PATCH 02/24] Remove comments --- .../nexus/ship/resources/ResourceProcessor.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala index ca8d1d1151..b68b9f8a28 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala @@ -54,13 +54,13 @@ class ResourceProcessor private (resources: Resources, clock: EventClock) extend resources.updateAttachedSchema(e.id, e.project, e.schema.toIdSegment) case e: ResourceRefreshed => resources.refresh(e.id, e.project, e.schema.toIdSegment.some) - case e: ResourceTagAdded => // todo deal with schema + case e: ResourceTagAdded => resources.tag(e.id, e.project, None, e.tag, e.targetRev, cRev) - case e: ResourceTagDeleted => // todo deal with schema + case e: ResourceTagDeleted => resources.deleteTag(e.id, e.project, None, e.tag, cRev) - case e: ResourceDeprecated => // todo deal with schema + case e: ResourceDeprecated => resources.deprecate(e.id, e.project, None, cRev) - case e: ResourceUndeprecated => // todo deal with schema + case e: ResourceUndeprecated => resources.undeprecate(e.id, e.project, None, cRev) } }.redeemWith( From b3801039fd16b057572999d317eb8a23a5de2bf5 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:29:39 +0100 Subject: [PATCH 03/24] Initial E2E test --- build.sbt | 2 +- .../ch/epfl/bluebrain/nexus/ship/Main.scala | 55 +------------- .../epfl/bluebrain/nexus/ship/RunShip.scala | 61 +++++++++++++++ .../bluebrain/nexus/ship/EndToEndTest.scala | 76 +++++++++++++++++++ .../epfl/bluebrain/nexus/ship/MainSuite.scala | 2 +- tests/docker/docker-compose.yml | 3 + .../nexus/tests/BaseIntegrationSpec.scala | 2 +- .../epfl/bluebrain/nexus/tests/Identity.scala | 4 +- 8 files changed, 149 insertions(+), 56 deletions(-) create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala create mode 100644 ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala diff --git a/build.sbt b/build.sbt index 02f702432c..7388bbc8b4 100755 --- a/build.sbt +++ b/build.sbt @@ -734,7 +734,7 @@ lazy val ship = project ) .enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin) .settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release) - .dependsOn(sdk % "compile->compile;test->test", testkit % "test->compile") + .dependsOn(sdk % "compile->compile;test->test", tests % "test->compile") .settings( libraryDependencies ++= Seq(declineEffect), addCompilerPlugin(betterMonadicFor), diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 0ea306859a..66e1cbdc9d 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -1,28 +1,14 @@ package ch.epfl.bluebrain.nexus.ship -import cats.effect.{Clock, ExitCode, IO} +import cats.effect.{ExitCode, IO} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} -import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization -import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext -import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings -import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo -import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.config.ShipConfig -import ch.epfl.bluebrain.nexus.ship.model.InputEvent -import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider -import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor -import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor -import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor import com.monovore.decline.Opts import com.monovore.decline.effect.CommandIOApp -import fs2.Stream -import fs2.io.file.{Files, Path} -import io.circe.parser._ +import fs2.io.file.Path object Main extends CommandIOApp( @@ -55,39 +41,11 @@ object Main override def main: Opts[IO[ExitCode]] = (run orElse showConfig) .map { - case Run(file, config, _) => run(file, config) + case Run(file, config, _) => new RunShip().run(file, config) case ShowConfig(config) => showConfig(config) } .map(_.as(ExitCode.Success)) - private[ship] def run(file: Path, config: Option[Path]): IO[ImportReport] = { - val clock = Clock[IO] - val uuidF = UUIDF.random - // Resources may have been created with different configurations so we adopt the lenient one for the import - implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient - for { - _ <- logger.info(s"Running the import with file $file, config $config and from offset $offset") - config <- ShipConfig.load(config) - report <- Transactors.init(config.database).use { xas => - val orgProvider = - OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) - val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled) - val eventLogConfig = config.eventLog - val baseUri = config.baseUri - for { - // Provision organizations - _ <- orgProvider.create(config.organizations.values) - events = eventStream(file) - fetchActiveOrg = FetchActiveOrganization(xas) - projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) - resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) - resourceProcessor <- ResourceProcessor(eventLogConfig, fetchContext, xas) - report <- EventProcessor.run(events, projectProcessor, resolverProcessor, resourceProcessor) - } yield report - } - } yield report - } - private[ship] def showConfig(config: Option[Path]) = for { _ <- logger.info(s"Showing reconciled config") @@ -95,13 +53,6 @@ object Main _ <- logger.info(config.root().render()) } yield () - private def eventStream(file: Path): Stream[IO, InputEvent] = - Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) => - IO.fromEither(decode[InputEvent](line)).onError { err => - logger.error(err)(s"Error parsing to event at line $index") - } - } - sealed private trait Command final private case class Run(file: Path, config: Option[Path], offset: Offset) extends Command diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala new file mode 100644 index 0000000000..d0f547d325 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -0,0 +1,61 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings +import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.ship.config.ShipConfig +import ch.epfl.bluebrain.nexus.ship.model.InputEvent +import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider +import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor +import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor +import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor +import fs2.Stream +import fs2.io.file.{Files, Path} +import io.circe.parser.decode + +class RunShip { + + private val logger = Logger[RunShip] + + def run(file: Path, config: Option[Path]): IO[ImportReport] = { + val clock = Clock[IO] + val uuidF = UUIDF.random + // Resources may have been created with different configurations so we adopt the lenient one for the import + implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient + for { + _ <- logger.info(s"Running the import with file $file, config $config") + config <- ShipConfig.load(config) + report <- Transactors.init(config.database).use { xas => + val orgProvider = + OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) + val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled) + val eventLogConfig = config.eventLog + val baseUri = config.baseUri + for { + // Provision organizations + _ <- orgProvider.create(config.organizations.values) + events = eventStream(file) + fetchActiveOrg = FetchActiveOrganization(xas) + projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) + resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) + resourceProcessor <- ResourceProcessor(eventLogConfig, fetchContext, xas) + report <- EventProcessor.run(events, projectProcessor, resolverProcessor, resourceProcessor) + } yield report + } + } yield report + } + + private def eventStream(file: Path): Stream[IO, InputEvent] = + Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) => + IO.fromEither(decode[InputEvent](line)).onError { err => + logger.error(err)(s"Error parsing to event at line $index") + } + } + +} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala new file mode 100644 index 0000000000..448513f51e --- /dev/null +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -0,0 +1,76 @@ +package ch.epfl.bluebrain.nexus.ship + +import akka.http.scaladsl.model.StatusCodes +import cats.data.NonEmptyList +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportEventQuery +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.tests.BaseIntegrationSpec +import ch.epfl.bluebrain.nexus.tests.Identity.writer +import ch.epfl.bluebrain.nexus.tests.iam.types.Permission +import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.Export +import io.circe.Json +import io.circe.syntax.EncoderOps + +import java.nio.file.{Files, Paths} +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.IteratorHasAsScala + +class EndToEndTest extends BaseIntegrationSpec { + + val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) + + override def beforeAll(): Unit = { + super.beforeAll() + aclDsl.addPermission(s"/", writer, Export.Run).accepted + () + } + + "The ship" should { + "transfer a project" in { + val query = ExportEventQuery( + Label.unsafe(project.project.value), + NonEmptyList.of(project), + Offset.start + ).asJson + + val createProject = createProjects(writer, project.organization.value, project.project.value) + + val runExport = deltaClient.post[Json]("/export/events", query, writer) { (_, response) => + response.status shouldEqual StatusCodes.Accepted + } + val deleteProject = + deltaClient.delete[Json](s"/projects/${project.organization}/${project.project}?rev=1&prune=true", writer) { + (_, response) => response.status shouldEqual StatusCodes.OK + } + val fetchProject = deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { + (_, response) => response.status shouldEqual StatusCodes.NotFound + } + + val findFile = IO.delay { + val folder = s"/tmp/ship/${project.project.value}/" + val folderPath = Paths.get(folder) + Files.newDirectoryStream(folderPath, "*.json").iterator().asScala.toList.head + } + + val runShip = findFile + .flatMap { filePath => + new RunShip().run(fs2.io.file.Path.fromNioPath(filePath), None) + } + .map { _ => 1 shouldEqual 1 } + + val setPermissions = aclDsl.addPermissions(s"/$project", writer, Permission.minimalPermissions) + + val fetchProjectSuccess = deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { + (_, response) => response.status shouldEqual StatusCodes.OK + } + + (createProject >> runExport >> IO.sleep(6.seconds) >> deleteProject >> eventually { + fetchProject + } >> runShip >> setPermissions >> fetchProjectSuccess).accepted + + } + } + +} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala index 27975d46da..f2dcc8279e 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala @@ -36,7 +36,7 @@ class MainSuite extends NexusSuite with MainSuite.Fixture { ) for { importFile <- ClasspathResourceLoader().absolutePath("import/import.json").map(Path(_)) - _ <- Main.run(importFile, None).assertEquals(expected) + _ <- new RunShip().run(importFile, None).assertEquals(expected) } yield () } diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index 32161cf1a4..0107119452 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -36,6 +36,7 @@ services: volumes: - ./config:/config - /tmp:/default-volume + - /tmp/ship:/tmp extra_hosts: - "delta:127.0.0.1" dns: @@ -138,6 +139,8 @@ services: postgres: image: library/postgres:15.6 + ports: + - 5432:5432 environment: POSTGRES_USER: "postgres" POSTGRES_PASSWORD: "postgres" diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseIntegrationSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseIntegrationSpec.scala index c6e972f3e6..eff92acff2 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseIntegrationSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseIntegrationSpec.scala @@ -64,7 +64,7 @@ trait BaseIntegrationSpec val deltaUrl: Uri = Uri(s"http://${sys.props.getOrElse("delta-url", "localhost:8080")}/v1") - private[tests] val deltaClient = HttpClient(deltaUrl) + val deltaClient: HttpClient = HttpClient(deltaUrl) val elasticsearchDsl = new ElasticsearchDsl() val blazegraphDsl = new BlazegraphDsl() diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/Identity.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/Identity.scala index 9325ae5990..a8dce65e92 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/Identity.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/Identity.scala @@ -36,6 +36,8 @@ object Identity extends Generators { // User with an invalid token val InvalidTokenUser: UserCredentials = UserCredentials(genString(), genString(), testRealm) + val writer = UserCredentials(genString(), genString(), testRealm) + object acls { val Marge = UserCredentials(genString(), genString(), testRealm) } @@ -107,6 +109,6 @@ object Identity extends Generators { } lazy val allUsers = - userPermissions.UserWithNoPermissions :: userPermissions.UserWithPermissions :: acls.Marge :: archives.Tweety :: compositeviews.Jerry :: events.BugsBunny :: listings.Bob :: listings.Alice :: aggregations.Charlie :: aggregations.Rose :: orgs.Fry :: orgs.Leela :: projects.Bojack :: projects.PrincessCarolyn :: resources.Rick :: resources.Morty :: storages.Coyote :: views.ScoobyDoo :: mash.Radar :: supervision.Mickey :: files.Writer :: typehierarchy.Writer :: Nil + userPermissions.UserWithNoPermissions :: userPermissions.UserWithPermissions :: acls.Marge :: archives.Tweety :: compositeviews.Jerry :: events.BugsBunny :: listings.Bob :: listings.Alice :: aggregations.Charlie :: aggregations.Rose :: orgs.Fry :: orgs.Leela :: projects.Bojack :: projects.PrincessCarolyn :: resources.Rick :: resources.Morty :: storages.Coyote :: views.ScoobyDoo :: mash.Radar :: supervision.Mickey :: files.Writer :: typehierarchy.Writer :: writer :: Nil } From 338bd916a3ea625ce81ace07b7c5da6ce07d166a Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Wed, 20 Mar 2024 16:55:22 +0100 Subject: [PATCH 04/24] Refactor E2E test --- .../bluebrain/nexus/ship/EndToEndTest.scala | 83 ++++++++++++------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index 448513f51e..487ede9910 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.tests.iam.types.Permission import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.Export import io.circe.Json import io.circe.syntax.EncoderOps +import org.scalatest.Assertion import java.nio.file.{Files, Paths} import scala.concurrent.duration.DurationInt @@ -19,58 +20,80 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala class EndToEndTest extends BaseIntegrationSpec { - val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) - override def beforeAll(): Unit = { super.beforeAll() aclDsl.addPermission(s"/", writer, Export.Run).accepted () } + + "The ship" should { + "transfer a project" in { + + val project = thereIsAProject() + + whenTheExportIsRunOnProject(project) + + theOldProjectIsDeleted(project) + + weRunTheImporter(project) + + weFixThePermissions(project) + + thereShouldBeAProject(project) + } + + def thereIsAProject(): ProjectRef = { + val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) + createProjects(writer, project.organization.value, project.project.value).accepted + project + } + + def whenTheExportIsRunOnProject(project: ProjectRef): Unit = { val query = ExportEventQuery( Label.unsafe(project.project.value), NonEmptyList.of(project), Offset.start ).asJson - val createProject = createProjects(writer, project.organization.value, project.project.value) - - val runExport = deltaClient.post[Json]("/export/events", query, writer) { (_, response) => + deltaClient.post[Json]("/export/events", query, writer) { (_, response) => response.status shouldEqual StatusCodes.Accepted - } - val deleteProject = - deltaClient.delete[Json](s"/projects/${project.organization}/${project.project}?rev=1&prune=true", writer) { - (_, response) => response.status shouldEqual StatusCodes.OK - } - val fetchProject = deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { - (_, response) => response.status shouldEqual StatusCodes.NotFound - } - - val findFile = IO.delay { - val folder = s"/tmp/ship/${project.project.value}/" - val folderPath = Paths.get(folder) - Files.newDirectoryStream(folderPath, "*.json").iterator().asScala.toList.head - } - - val runShip = findFile - .flatMap { filePath => - new RunShip().run(fs2.io.file.Path.fromNioPath(filePath), None) - } - .map { _ => 1 shouldEqual 1 } + }.accepted - val setPermissions = aclDsl.addPermissions(s"/$project", writer, Permission.minimalPermissions) + IO.sleep(6.seconds).accepted + } - val fetchProjectSuccess = deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { + def theOldProjectIsDeleted(project: ProjectRef): Unit = { + deltaClient.delete[Json](s"/projects/${project.organization}/${project.project}?rev=1&prune=true", writer) { (_, response) => response.status shouldEqual StatusCodes.OK + }.accepted + + eventually { + deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { + (_, response) => response.status shouldEqual StatusCodes.NotFound + } } + () + } - (createProject >> runExport >> IO.sleep(6.seconds) >> deleteProject >> eventually { - fetchProject - } >> runShip >> setPermissions >> fetchProjectSuccess).accepted + def weRunTheImporter(project: ProjectRef): Unit = { + val folder = s"/tmp/ship/${project.project.value}/" + val folderPath = Paths.get(folder) + val file = Files.newDirectoryStream(folderPath, "*.json").iterator().asScala.toList.head + new RunShip().run(fs2.io.file.Path.fromNioPath(file), None).accepted + () } + + def thereShouldBeAProject(project: ProjectRef): Assertion = { + deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { + (_, response) => response.status shouldEqual StatusCodes.OK + }.accepted + } + + def weFixThePermissions(project: ProjectRef) = aclDsl.addPermissions(s"/$project", writer, Permission.minimalPermissions).accepted } } From b15c88ebdad767d7b2532e81652d6852f97e4bfb Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Thu, 21 Mar 2024 00:47:59 +0100 Subject: [PATCH 05/24] Test JSON payload matches --- .../bluebrain/nexus/ship/EndToEndTest.scala | 17 +++++++++------ .../bluebrain/nexus/tests/HttpClient.scala | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index 487ede9910..802079eeb5 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -32,7 +32,7 @@ class EndToEndTest extends BaseIntegrationSpec { "transfer a project" in { - val project = thereIsAProject() + val (project, projectJson) = thereIsAProject() whenTheExportIsRunOnProject(project) @@ -42,13 +42,15 @@ class EndToEndTest extends BaseIntegrationSpec { weFixThePermissions(project) - thereShouldBeAProject(project) + thereShouldBeAProject(project, projectJson) } - def thereIsAProject(): ProjectRef = { + def thereIsAProject(): (ProjectRef, Json) = { val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) createProjects(writer, project.organization.value, project.project.value).accepted - project + val (projectJson, status) = deltaClient.getJsonAndStatus(s"/projects/${project.organization}/${project.project}", writer).accepted + status shouldEqual StatusCodes.OK + project -> projectJson } def whenTheExportIsRunOnProject(project: ProjectRef): Unit = { @@ -87,9 +89,12 @@ class EndToEndTest extends BaseIntegrationSpec { () } - def thereShouldBeAProject(project: ProjectRef): Assertion = { + def thereShouldBeAProject(project: ProjectRef, originalJson: Json): Assertion = { deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { - (_, response) => response.status shouldEqual StatusCodes.OK + (json, response) => { + response.status shouldEqual StatusCodes.OK + json shouldEqual originalJson + } }.accepted } diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala index 76e8d5a29c..8a542a5ab5 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala @@ -200,6 +200,10 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit requestJson(GET, url, None, identity, (a: A, _: HttpResponse) => a, jsonHeaders) } + def getJsonAndStatus(url: String, identity: Identity)(implicit um: FromEntityUnmarshaller[Json]): IO[(Json, StatusCode)] = { + requestJsonAndStatus(GET, url, None, identity, jsonHeaders) + } + def delete[A](url: String, identity: Identity, extraHeaders: Seq[HttpHeader] = jsonHeaders)( assertResponse: (A, HttpResponse) => Assertion )(implicit um: FromEntityUnmarshaller[A]): IO[Assertion] = @@ -261,6 +265,23 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit ) } + def requestJsonAndStatus( + method: HttpMethod, + url: String, + body: Option[Json], + identity: Identity, + extraHeaders: Seq[HttpHeader] + )(implicit um: FromEntityUnmarshaller[Json]): IO[(Json, StatusCode)] = + request[Json, Json, (Json, StatusCode)]( + method, + url, + body, + identity, + (j: Json) => HttpEntity(ContentTypes.`application/json`, j.noSpaces), + (json, response) => (json, response.status), + extraHeaders + ) + def requestJson[A, R]( method: HttpMethod, url: String, From 72d306fc58869c319cee33293094b153c5404ed0 Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Thu, 21 Mar 2024 10:56:41 +0100 Subject: [PATCH 06/24] Fix EventClock losing precision --- .../bluebrain/nexus/testkit/scalatest/ce/CatsIOValues.scala | 5 +---- .../main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala | 4 +++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/scalatest/ce/CatsIOValues.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/scalatest/ce/CatsIOValues.scala index a9f35012bc..d9e2e2ab35 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/scalatest/ce/CatsIOValues.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/scalatest/ce/CatsIOValues.scala @@ -14,10 +14,7 @@ trait CatsIOValues { implicit final class CatsIOValuesOps[A](private val io: IO[A]) { def accepted(implicit pos: source.Position): A = { - io.attempt.unsafeRunTimed(45.seconds).getOrElse(fail("IO timed out during .accepted call")) match { - case Left(e) => fail(s"IO failed when it was expected to succeed $e.", e) - case Right(value) => value - } + io.unsafeRunTimed(45.seconds).getOrElse(fail("IO timed out during .accepted call")) } def rejected(implicit pos: source.Position): Throwable = rejectedWith[Throwable] diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala index 27d74b5326..6c039f408b 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala @@ -21,7 +21,9 @@ class EventClock(instant: Ref[IO, Instant]) extends Clock[IO] { override def realTime: IO[FiniteDuration] = toDuration private def toDuration: IO[FiniteDuration] = instant.get.map { i => - FiniteDuration(i.toEpochMilli, TimeUnit.MILLISECONDS) + val seconds = FiniteDuration(i.getEpochSecond, TimeUnit.SECONDS) + val nanos = FiniteDuration(i.getNano, TimeUnit.NANOSECONDS) + seconds + nanos } } From 1d97b0b25d2b1a6fbc58f5f703ca4b848796435d Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Thu, 21 Mar 2024 11:07:12 +0100 Subject: [PATCH 07/24] Fix dependency graph --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 7388bbc8b4..de13bce15b 100755 --- a/build.sbt +++ b/build.sbt @@ -734,7 +734,7 @@ lazy val ship = project ) .enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin) .settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release) - .dependsOn(sdk % "compile->compile;test->test", tests % "test->compile") + .dependsOn(sdk % "compile->compile;test->test", tests % "test->test") .settings( libraryDependencies ++= Seq(declineEffect), addCompilerPlugin(betterMonadicFor), From efc1e6b53d378104bc9bbdc800e2d980b67d2923 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:37:01 +0100 Subject: [PATCH 08/24] Use docker-compose file --- .github/workflows/ci-delta-ship.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index d91a60f46b..5010fb9471 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -30,6 +30,18 @@ jobs: java-version: '21' cache: 'sbt' check-latest: true + - name: Clean, build Delta & Storage images + run: | + sbt -Dsbt.color=always -Dsbt.supershell=false \ + clean \ + app/Docker/publishLocal \ + storage/Docker/publishLocal + - name: Start services + run: docker-compose -f tests/docker/docker-compose.yml up -d + - name: Waiting for Delta to start + run: | + URL="http://localhost:8080/v1/version" + curl --connect-timeout 3 --max-time 5 --retry 30 --retry-all-errors --retry-delay 3 --retry-max-time 90 $URL - name: Unit tests run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ From 0a834f9bc8e59c260f56dde12b8dd0dadd8981d7 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 14:42:11 +0100 Subject: [PATCH 09/24] Debug CI --- .github/workflows/ci-delta-ship.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 5010fb9471..ab6247a876 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -30,6 +30,10 @@ jobs: java-version: '21' cache: 'sbt' check-latest: true + - name: Create tmp + run: | + cd /tmp + mkdir ship - name: Clean, build Delta & Storage images run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ @@ -42,6 +46,12 @@ jobs: run: | URL="http://localhost:8080/v1/version" curl --connect-timeout 3 --max-time 5 --retry 30 --retry-all-errors --retry-delay 3 --retry-max-time 90 $URL + - name: list + run: ls -la /tmp + - name: tree + run: | + sudo apt install tree + tree /tmp - name: Unit tests run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ From b33a286523f354e6018914b4cee34c02290417f7 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 14:50:40 +0100 Subject: [PATCH 10/24] DEBUG Reorder CI --- .github/workflows/ci-delta-ship.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index ab6247a876..063cb59a95 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -48,12 +48,12 @@ jobs: curl --connect-timeout 3 --max-time 5 --retry 30 --retry-all-errors --retry-delay 3 --retry-max-time 90 $URL - name: list run: ls -la /tmp - - name: tree - run: | - sudo apt install tree - tree /tmp - name: Unit tests run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ clean \ ship-unit-tests-with-coverage + - name: tree + run: | + sudo apt install tree + tree /tmp From c496bcef59e84c896c144e0d0ee20a5f1f842c9f Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:37:02 +0100 Subject: [PATCH 11/24] DEBUG Add default.conf test resource --- ship/src/test/resources/config/default.conf | 55 +++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 ship/src/test/resources/config/default.conf diff --git a/ship/src/test/resources/config/default.conf b/ship/src/test/resources/config/default.conf new file mode 100644 index 0000000000..fba48d6ebe --- /dev/null +++ b/ship/src/test/resources/config/default.conf @@ -0,0 +1,55 @@ +ship { + base-uri = "http://localhost:8080/v1" + + database { + read = ${ship.database.access} + # Access to database for write access + write = ${ship.database.access} + # Access to database for streaming access (indexing / SSEs) + streaming = ${ship.database.access} + + # when true it creates the tables on service boot + tables-autocreate = false + + cache { + # The max number of tokens in the partition cache + max-size = 1000 + # The duration after an entry in the cache expires + expire-after = 10 minutes + } + + access { + # the database host + host = 127.0.0.1 + # the database port + port = 5432 + # the pool size + pool-size = 10 + } + + name = "postgres" + username = "postgres" + password = "postgres" + } + + event-log { + query-config = { + batch-size = 30 + refresh-strategy = 3s + } + max-duration = 14 seconds + } + + organizations { + values { + # organization example + #obp = "The Open Brain Platform Organization" + } + } + + # Service account configuration for internal operations + service-account { + subject: "delta" + realm: "internal" + } +} \ No newline at end of file From fc0bb533c4f4d52c781a743a26aea2f523d09dd8 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:05:29 +0100 Subject: [PATCH 12/24] Add resolver test --- .../bluebrain/nexus/ship/EndToEndTest.scala | 85 ++++++++++++++----- 1 file changed, 65 insertions(+), 20 deletions(-) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index 802079eeb5..c8323d2a90 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -3,6 +3,9 @@ package ch.epfl.bluebrain.nexus.ship import akka.http.scaladsl.model.StatusCodes import cats.data.NonEmptyList import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportEventQuery import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -26,8 +29,6 @@ class EndToEndTest extends BaseIntegrationSpec { () } - - "The ship" should { "transfer a project" in { @@ -45,10 +46,25 @@ class EndToEndTest extends BaseIntegrationSpec { thereShouldBeAProject(project, projectJson) } + "transfer a resolver" in { + val (project, _) = thereIsAProject() + val defaultInProjectResolver = nxv + "defaultInProject" + val (_, resolverJson) = thereIsAResolver(defaultInProjectResolver, project) + + whenTheExportIsRunOnProject(project) + theOldProjectIsDeleted(project) + + weRunTheImporter(project) + weFixThePermissions(project) + + thereShouldBeAResolver(project, defaultInProjectResolver, resolverJson) + } + def thereIsAProject(): (ProjectRef, Json) = { - val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) + val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) createProjects(writer, project.organization.value, project.project.value).accepted - val (projectJson, status) = deltaClient.getJsonAndStatus(s"/projects/${project.organization}/${project.project}", writer).accepted + val (projectJson, status) = + deltaClient.getJsonAndStatus(s"/projects/${project.organization}/${project.project}", writer).accepted status shouldEqual StatusCodes.OK project -> projectJson } @@ -60,21 +76,25 @@ class EndToEndTest extends BaseIntegrationSpec { Offset.start ).asJson - deltaClient.post[Json]("/export/events", query, writer) { (_, response) => - response.status shouldEqual StatusCodes.Accepted - }.accepted + deltaClient + .post[Json]("/export/events", query, writer) { (_, response) => + response.status shouldEqual StatusCodes.Accepted + } + .accepted - IO.sleep(6.seconds).accepted + IO.sleep(5.seconds).accepted } def theOldProjectIsDeleted(project: ProjectRef): Unit = { - deltaClient.delete[Json](s"/projects/${project.organization}/${project.project}?rev=1&prune=true", writer) { - (_, response) => response.status shouldEqual StatusCodes.OK - }.accepted + deltaClient + .delete[Json](s"/projects/${project.organization}/${project.project}?rev=1&prune=true", writer) { + (_, response) => response.status shouldEqual StatusCodes.OK + } + .accepted eventually { - deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { - (_, response) => response.status shouldEqual StatusCodes.NotFound + deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { (_, response) => + response.status shouldEqual StatusCodes.NotFound } } () @@ -83,22 +103,47 @@ class EndToEndTest extends BaseIntegrationSpec { def weRunTheImporter(project: ProjectRef): Unit = { val folder = s"/tmp/ship/${project.project.value}/" val folderPath = Paths.get(folder) - val file = Files.newDirectoryStream(folderPath, "*.json").iterator().asScala.toList.head + val file = Files.newDirectoryStream(folderPath, "*.json").iterator().asScala.toList.head new RunShip().run(fs2.io.file.Path.fromNioPath(file), None).accepted () } def thereShouldBeAProject(project: ProjectRef, originalJson: Json): Assertion = { - deltaClient.get[Json](s"/projects/${project.organization}/${project.project}", writer) { - (json, response) => { - response.status shouldEqual StatusCodes.OK - json shouldEqual originalJson + deltaClient + .get[Json](s"/projects/${project.organization}/${project.project}", writer) { (json, response) => + { + response.status shouldEqual StatusCodes.OK + json shouldEqual originalJson + } } - }.accepted + .accepted } - def weFixThePermissions(project: ProjectRef) = aclDsl.addPermissions(s"/$project", writer, Permission.minimalPermissions).accepted + def weFixThePermissions(project: ProjectRef) = + aclDsl.addPermissions(s"/$project", writer, Permission.minimalPermissions).accepted + + def thereIsAResolver(resolver: Iri, project: ProjectRef): (Iri, Json) = { + val encodedResolver = UrlUtils.encode(resolver.toString) + val (resolverJson, status) = deltaClient + .getJsonAndStatus(s"/resolvers/${project.organization}/${project.project}/$encodedResolver", writer) + .accepted + status shouldEqual StatusCodes.OK + resolver -> resolverJson + } + + def thereShouldBeAResolver(project: ProjectRef, resolver: Iri, originalJson: Json): Assertion = { + val encodedResolver = UrlUtils.encode(resolver.toString) + deltaClient + .get[Json](s"/resolvers/${project.organization}/${project.project}/$encodedResolver", writer) { + (json, response) => + { + response.status shouldEqual StatusCodes.OK + json shouldEqual originalJson + } + } + .accepted + } } } From 2579a749aa8b537eb86c058cf7e77cfc846de173 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:05:34 +0100 Subject: [PATCH 13/24] Clean CI --- .github/workflows/ci-delta-ship.yml | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 063cb59a95..5118c7053f 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -30,30 +30,18 @@ jobs: java-version: '21' cache: 'sbt' check-latest: true - - name: Create tmp - run: | - cd /tmp - mkdir ship - name: Clean, build Delta & Storage images run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ - clean \ - app/Docker/publishLocal \ - storage/Docker/publishLocal + app/Docker/publishLocal - name: Start services run: docker-compose -f tests/docker/docker-compose.yml up -d - name: Waiting for Delta to start run: | URL="http://localhost:8080/v1/version" curl --connect-timeout 3 --max-time 5 --retry 30 --retry-all-errors --retry-delay 3 --retry-max-time 90 $URL - - name: list - run: ls -la /tmp - name: Unit tests run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ clean \ ship-unit-tests-with-coverage - - name: tree - run: | - sudo apt install tree - tree /tmp From 54cb8f6408f0bcd9686b288ff8db895237ad7c77 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:29:42 +0100 Subject: [PATCH 14/24] Add resource test --- .../bluebrain/nexus/ship/EndToEndTest.scala | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index c8323d2a90..f5e6a4c5f7 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -46,7 +46,7 @@ class EndToEndTest extends BaseIntegrationSpec { thereShouldBeAProject(project, projectJson) } - "transfer a resolver" in { + "transfer the default resolver" in { val (project, _) = thereIsAProject() val defaultInProjectResolver = nxv + "defaultInProject" val (_, resolverJson) = thereIsAResolver(defaultInProjectResolver, project) @@ -60,6 +60,20 @@ class EndToEndTest extends BaseIntegrationSpec { thereShouldBeAResolver(project, defaultInProjectResolver, resolverJson) } + "transfer a generic resource" in { + val (project, _) = thereIsAProject() + val (resource, resourceJson) = thereIsAResource(project) + + whenTheExportIsRunOnProject(project) + theOldProjectIsDeleted(project) + + weRunTheImporter(project) + weFixThePermissions(project) + + // TODO: This test currently fails because of the dummy ValidateResource implementation in ship + thereShouldBeAResource(project, resource, resourceJson) + } + def thereIsAProject(): (ProjectRef, Json) = { val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) createProjects(writer, project.organization.value, project.project.value).accepted @@ -82,7 +96,7 @@ class EndToEndTest extends BaseIntegrationSpec { } .accepted - IO.sleep(5.seconds).accepted + IO.sleep(4.seconds).accepted } def theOldProjectIsDeleted(project: ProjectRef): Unit = { @@ -144,6 +158,36 @@ class EndToEndTest extends BaseIntegrationSpec { } .accepted } + + def thereIsAResource(project: ProjectRef): (Iri, Json) = { + val resource = nxv + genString() + val encodedResource = UrlUtils.encode(resource.toString) + val body = json"""{"hello": "world"}""" + deltaClient + .put[Json](s"/resources/${project.organization}/${project.project}/_/$encodedResource", body, writer) { + (_, response) => + response.status shouldEqual StatusCodes.Created + } + .accepted + val (resourceJson, status) = deltaClient + .getJsonAndStatus(s"/resources/${project.organization}/${project.project}/_/$encodedResource", writer) + .accepted + status shouldEqual StatusCodes.OK + resource -> resourceJson + } + + def thereShouldBeAResource(project: ProjectRef, resource: Iri, originalJson: Json): Assertion = { + val encodedResolver = UrlUtils.encode(resource.toString) + deltaClient + .get[Json](s"/resources/${project.organization}/${project.project}/_/$encodedResolver", writer) { + (json, response) => + { + response.status shouldEqual StatusCodes.OK + json shouldEqual originalJson + } + } + .accepted + } } } From 30dead338f3673f3e083c29cf1fae7a0f20f9af1 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:29:47 +0100 Subject: [PATCH 15/24] Correct Resource wiring --- .../nexus/delta/wiring/AclsModule.scala | 2 +- .../nexus/delta/sdk/acls/AclsImpl.scala | 5 +-- .../nexus/delta/sdk/acls/AclsImplSpec.scala | 2 +- .../OrganizationDeleterSuite.scala | 2 +- ...erPermissionsScopeInitializationSpec.scala | 4 +-- .../bluebrain/nexus/ship/EventClock.scala | 2 +- .../epfl/bluebrain/nexus/ship/RunShip.scala | 4 ++- .../bluebrain/nexus/ship/acls/AclOps.scala | 23 +++++++++++++ .../nexus/ship/resolvers/ResolverOps.scala | 27 +++++++++++++++ .../ship/resolvers/ResolverProcessor.scala | 12 ++----- .../ship/resources/ResourceProcessor.scala | 33 ++++++++++--------- .../nexus/ship/schemas/SchemaOps.scala | 32 ++++++++++++++++++ .../bluebrain/nexus/ship/EndToEndTest.scala | 3 +- .../bluebrain/nexus/tests/HttpClient.scala | 16 +++++---- 14 files changed, 125 insertions(+), 42 deletions(-) create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/AclsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/AclsModule.scala index aa758276e3..c701b9d827 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/AclsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/AclsModule.scala @@ -37,7 +37,7 @@ object AclsModule extends ModuleDef { permissions.fetchPermissionSet, AclsImpl.findUnknownRealms(xas), permissions.minimum, - config.acls, + config.acls.eventLog, xas, clock ) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala index 341279f4b0..306a34cc20 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing._ +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore @@ -108,12 +109,12 @@ object AclsImpl { fetchPermissionSet: IO[Set[Permission]], findUnknownRealms: Set[Label] => IO[Unit], minimum: Set[Permission], - config: AclsConfig, + config: EventLogConfig, xas: Transactors, clock: Clock[IO] ): Acls = new AclsImpl( - GlobalEventLog(Acls.definition(fetchPermissionSet, findUnknownRealms, clock), config.eventLog, xas), + GlobalEventLog(Acls.definition(fetchPermissionSet, findUnknownRealms, clock), config, xas), minimum ) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala index 928756cce6..6fac31b3f2 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala @@ -64,7 +64,7 @@ class AclsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with Cance IO.pure(minimumPermissions), Acls.findUnknownRealms(_, Set(realm, realm2)), minimumPermissions, - AclsConfig(eventLogConfig), + eventLogConfig, xas, clock ) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala index 75b4c0eb51..c702bb4016 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala @@ -43,7 +43,7 @@ class OrganizationDeleterSuite extends NexusSuite with ConfigFixtures with Proje private val fields = ProjectFields(None, ApiMappings.empty, None, None) private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, eventLogConfig, xas, clock) private val permission = Permissions.resources.read - private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig, xas, clock) + private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig.eventLog, xas, clock) implicit val subject: Subject = Identity.User("Bob", Label.unsafe("realm")) implicit val uuidF: UUIDF = UUIDF.fixed(UUID.randomUUID()) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitializationSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitializationSpec.scala index 65a761216d..a0a71021bc 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitializationSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitializationSpec.scala @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.projects import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress} -import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsConfig, AclsImpl} +import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsImpl} import ch.epfl.bluebrain.nexus.delta.sdk.generators.{OrganizationGen, PermissionsGen, ProjectGen} import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.{Caller, ServiceAccount} import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions @@ -22,7 +22,7 @@ class OwnerPermissionsScopeInitializationSpec extends CatsEffectSpec with Doobie IO.pure(PermissionsGen.minimum), Acls.findUnknownRealms(_, Set(saRealm, usersRealm)), PermissionsGen.minimum, - AclsConfig(eventLogConfig), + eventLogConfig, xas, clock ) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala index 6c039f408b..c7f5c56d39 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventClock.scala @@ -22,7 +22,7 @@ class EventClock(instant: Ref[IO, Instant]) extends Clock[IO] { private def toDuration: IO[FiniteDuration] = instant.get.map { i => val seconds = FiniteDuration(i.getEpochSecond, TimeUnit.SECONDS) - val nanos = FiniteDuration(i.getNano, TimeUnit.NANOSECONDS) + val nanos = FiniteDuration(i.getNano, TimeUnit.NANOSECONDS) seconds + nanos } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index d0f547d325..fcfd6fad7e 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor +import ch.epfl.bluebrain.nexus.ship.schemas.SchemaOps import fs2.Stream import fs2.io.file.{Files, Path} import io.circe.parser.decode @@ -42,9 +43,10 @@ class RunShip { _ <- orgProvider.create(config.organizations.values) events = eventStream(file) fetchActiveOrg = FetchActiveOrganization(xas) + fetchSchema <- SchemaOps.fetchSchema(config.eventLog, clock, xas) projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) - resourceProcessor <- ResourceProcessor(eventLogConfig, fetchContext, xas) + resourceProcessor <- ResourceProcessor(eventLogConfig, fetchContext, fetchSchema, xas) report <- EventProcessor.run(events, projectProcessor, resolverProcessor, resourceProcessor) } yield report } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala new file mode 100644 index 0000000000..2afe11dd82 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala @@ -0,0 +1,23 @@ +package ch.epfl.bluebrain.nexus.ship.acls + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclsImpl +import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig + +object AclOps { + + def acls(config: EventLogConfig, clock: Clock[IO], xas: Transactors) = { + val permissionSet = Set(Permission.unsafe("resources/read")) + AclsImpl( + IO.pure(permissionSet), + AclsImpl.findUnknownRealms(xas), + permissionSet, + config, + xas, + clock + ) + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala new file mode 100644 index 0000000000..819bd17792 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala @@ -0,0 +1,27 @@ +package ch.epfl.bluebrain.nexus.ship.resolvers + +import cats.effect.IO +import cats.effect.kernel.Clock +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, ResolversImpl} +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig + +object ResolverOps { + + def resolvers(fetchContext: FetchContext, config: EventLogConfig, clock: Clock[IO], xas: Transactors)(implicit + jsonLdApi: JsonLdApi, + uuidF: UUIDF + ) = + ResolversImpl( + fetchContext, + // We rely on the parsed values and not on the original value + ResolverContextResolution.never, + config, + xas, + clock + ) + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala index 00282709ff..c50b9c0483 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext -import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, Resolvers, ResolversImpl} +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.model.IdentityResolution._ import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.model.ResolverEvent._ import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.model.ResolverRejection.{IncorrectRev, ResourceAlreadyExists} @@ -81,14 +81,8 @@ object ResolverProcessor { )(implicit api: JsonLdApi): IO[ResolverProcessor] = EventClock.init().map { clock => implicit val uuidF: UUIDF = FailingUUID - val resolvers = ResolversImpl( - fetchContext, - // We rely on the parsed values and not on the original value - ResolverContextResolution.never, - config, - xas, - clock - ) + + val resolvers = ResolverOps.resolvers(fetchContext, config, clock, xas) new ResolverProcessor(resolvers, clock) } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala index b68b9f8a28..c2b1bd11e2 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala @@ -5,21 +5,23 @@ import cats.implicits.catsSyntaxOptionId import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller -import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdAssembly -import ch.epfl.bluebrain.nexus.delta.sdk.model.{IdSegment, IdSegmentRef, ResourceF} +import ch.epfl.bluebrain.nexus.delta.sdk.model.{IdSegment, IdSegmentRef} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext -import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution -import ch.epfl.bluebrain.nexus.delta.sdk.resources.ValidationResult.NoValidation +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, ResourceResolution} +import ch.epfl.bluebrain.nexus.delta.sdk.resources._ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceEvent import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceEvent._ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{IncorrectRev, ResourceAlreadyExists} -import ch.epfl.bluebrain.nexus.delta.sdk.resources._ -import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.FetchSchema import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ResourceRef} import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.ship.acls.AclOps +import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverOps import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor.logger import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus} import io.circe.Decoder @@ -79,8 +81,9 @@ object ResourceProcessor { private val logger = Logger[ResourceProcessor] def apply( - eventLogConfig: EventLogConfig, + config: EventLogConfig, fetchContext: FetchContext, + fetchSchema: FetchSchema, xas: Transactors )(implicit jsonLdApi: JsonLdApi): IO[ResourceProcessor] = EventClock.init().map { clock => @@ -88,15 +91,15 @@ object ResourceProcessor { val detectChange = DetectChange(false) - val validate = new ValidateResource { - override def apply(jsonld: JsonLdAssembly, schema: SchemaClaim, enforceSchema: Boolean): IO[ValidationResult] = - IO.pure(NoValidation(ProjectRef.unsafe("org", "proj"))) - override def apply(jsonld: JsonLdAssembly, schema: ResourceF[Schema]): IO[ValidationResult] = - IO.pure(NoValidation(ProjectRef.unsafe("org", "proj"))) - } + val aclCheck = AclCheck(AclOps.acls(config, clock, xas)) + val resolvers = ResolverOps.resolvers(fetchContext, config, clock, xas) + val resourceResolution = + ResourceResolution.schemaResource(aclCheck, resolvers, fetchSchema, excludeDeprecated = false) + + val validate = ValidateResource(resourceResolution)(RemoteContextResolution.never) val resourceDef = Resources.definition(validate, detectChange, clock) - val resourceLog = ScopedEventLog(resourceDef, eventLogConfig, xas) + val resourceLog = ScopedEventLog(resourceDef, config, xas) val resources = ResourcesImpl( resourceLog, diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala new file mode 100644 index 0000000000..83a8bfe6b2 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala @@ -0,0 +1,32 @@ +package ch.epfl.bluebrain.nexus.ship.schemas + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution +import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ShaclShapesGraph +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{FetchSchema, Schemas, ValidateSchema} +import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig + +object SchemaOps { + + def validateSchema(implicit api: JsonLdApi): IO[ValidateSchema] = { + val rcr = RemoteContextResolution.never + ShaclShapesGraph.shaclShaclShapes.map(ValidateSchema(api, _, rcr)) + } + + def schemaLog(config: EventLogConfig, clock: Clock[IO], xas: Transactors)(implicit api: JsonLdApi): IO[SchemaLog] = + for { + validate <- validateSchema + schemaDef = Schemas.definition(validate, clock) + } yield ScopedEventLog(schemaDef, config, xas) + + def fetchSchema(config: EventLogConfig, clock: Clock[IO], xas: Transactors)(implicit + api: JsonLdApi + ): IO[FetchSchema] = + for { + log <- schemaLog(config, clock, xas) + } yield FetchSchema(log) + +} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index f5e6a4c5f7..003f3c07d2 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -61,7 +61,7 @@ class EndToEndTest extends BaseIntegrationSpec { } "transfer a generic resource" in { - val (project, _) = thereIsAProject() + val (project, _) = thereIsAProject() val (resource, resourceJson) = thereIsAResource(project) whenTheExportIsRunOnProject(project) @@ -70,7 +70,6 @@ class EndToEndTest extends BaseIntegrationSpec { weRunTheImporter(project) weFixThePermissions(project) - // TODO: This test currently fails because of the dummy ValidateResource implementation in ship thereShouldBeAResource(project, resource, resourceJson) } diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala index 8a542a5ab5..7c9582c31b 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala @@ -200,7 +200,9 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit requestJson(GET, url, None, identity, (a: A, _: HttpResponse) => a, jsonHeaders) } - def getJsonAndStatus(url: String, identity: Identity)(implicit um: FromEntityUnmarshaller[Json]): IO[(Json, StatusCode)] = { + def getJsonAndStatus(url: String, identity: Identity)(implicit + um: FromEntityUnmarshaller[Json] + ): IO[(Json, StatusCode)] = { requestJsonAndStatus(GET, url, None, identity, jsonHeaders) } @@ -266,12 +268,12 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit } def requestJsonAndStatus( - method: HttpMethod, - url: String, - body: Option[Json], - identity: Identity, - extraHeaders: Seq[HttpHeader] - )(implicit um: FromEntityUnmarshaller[Json]): IO[(Json, StatusCode)] = + method: HttpMethod, + url: String, + body: Option[Json], + identity: Identity, + extraHeaders: Seq[HttpHeader] + )(implicit um: FromEntityUnmarshaller[Json]): IO[(Json, StatusCode)] = request[Json, Json, (Json, StatusCode)]( method, url, From aafaccf0adbfe68645d228697b5c3aef6c9100e7 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:31:29 +0100 Subject: [PATCH 16/24] Disable coverage for now --- .github/workflows/ci-delta-ship.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 5118c7053f..3fa8d0a92e 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -44,4 +44,4 @@ jobs: run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ clean \ - ship-unit-tests-with-coverage + ship-unit-tests From a861378e864b791f6071461bc57a3e2a2efba47f Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:39:59 +0100 Subject: [PATCH 17/24] AclsImpl takes the EventLogConfig directly --- .../ch/epfl/bluebrain/nexus/delta/routes/AclsRoutesSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/AclsRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/AclsRoutesSpec.scala index a4e6180462..e8c31aed4a 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/AclsRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/AclsRoutesSpec.scala @@ -7,7 +7,7 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Organization, Project, Root} import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress} -import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsConfig, AclsImpl} +import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsImpl} import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ @@ -90,7 +90,7 @@ class AclsRoutesSpec extends BaseRouteSpec { IO.pure(Set(aclsPermissions.read, aclsPermissions.write, managePermission, events.read)), Acls.findUnknownRealms(_, Set(realm1, realm2)), Set(aclsPermissions.read, aclsPermissions.write, managePermission, events.read), - AclsConfig(EventLogConfig(QueryConfig(5, RefreshStrategy.Stop), 100.millis)), + EventLogConfig(QueryConfig(5, RefreshStrategy.Stop), 100.millis), xas, clock ) From 70e7e9f089fbd5a67af4bce029e777df31b0a45e Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:41:12 +0100 Subject: [PATCH 18/24] Longer wait time for export --- .../test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index 003f3c07d2..1b0f033af5 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -95,7 +95,7 @@ class EndToEndTest extends BaseIntegrationSpec { } .accepted - IO.sleep(4.seconds).accepted + IO.sleep(6.seconds).accepted } def theOldProjectIsDeleted(project: ProjectRef): Unit = { From 476bbe3f82a0237f0c1a964d1ac866a0cf5978ae Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:46:59 +0100 Subject: [PATCH 19/24] Try docker image caching --- .github/workflows/ci-delta-ship.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 3fa8d0a92e..09e7b9d403 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -34,6 +34,10 @@ jobs: run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ app/Docker/publishLocal + - name: Cache Docker images. + uses: ScribeMD/docker-cache@0.4.0 + with: + key: docker-${{ runner.os }}-${{ hashFiles('tests/docker/docker-compose.yml') }} - name: Start services run: docker-compose -f tests/docker/docker-compose.yml up -d - name: Waiting for Delta to start From 904b4e7fb396412600512e36595a73d47523a8be Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:50:37 +0100 Subject: [PATCH 20/24] Fix CI --- .github/workflows/ci-delta-ship.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 09e7b9d403..226d3406c7 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -23,6 +23,8 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + - name: Create /tmp/ship folder + run: mkdir -p /tmp/ship - name: Setup JDK uses: actions/setup-java@v4 with: From 2f55dd768934bdf55b7dc7a424958e406cdb216f Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:52:28 +0100 Subject: [PATCH 21/24] Move docker image caching --- .github/workflows/ci-delta-ship.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 226d3406c7..10be64f5ef 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -32,14 +32,14 @@ jobs: java-version: '21' cache: 'sbt' check-latest: true - - name: Clean, build Delta & Storage images - run: | - sbt -Dsbt.color=always -Dsbt.supershell=false \ - app/Docker/publishLocal - name: Cache Docker images. uses: ScribeMD/docker-cache@0.4.0 with: key: docker-${{ runner.os }}-${{ hashFiles('tests/docker/docker-compose.yml') }} + - name: Clean, build Delta & Storage images + run: | + sbt -Dsbt.color=always -Dsbt.supershell=false \ + app/Docker/publishLocal - name: Start services run: docker-compose -f tests/docker/docker-compose.yml up -d - name: Waiting for Delta to start From 7b6fd7d0398a68537342ab4461706031dd5b51c0 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Thu, 21 Mar 2024 18:16:08 +0100 Subject: [PATCH 22/24] Docker image caching is not faster --- .github/workflows/ci-delta-ship.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml index 10be64f5ef..4c348c4e51 100644 --- a/.github/workflows/ci-delta-ship.yml +++ b/.github/workflows/ci-delta-ship.yml @@ -32,10 +32,6 @@ jobs: java-version: '21' cache: 'sbt' check-latest: true - - name: Cache Docker images. - uses: ScribeMD/docker-cache@0.4.0 - with: - key: docker-${{ runner.os }}-${{ hashFiles('tests/docker/docker-compose.yml') }} - name: Clean, build Delta & Storage images run: | sbt -Dsbt.color=always -Dsbt.supershell=false \ From 1b734cde0841e59b93e0a7aa9f7c4bab99ac222c Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Fri, 22 Mar 2024 12:11:41 +0100 Subject: [PATCH 23/24] Wire dummy SchemaProcessor --- .../epfl/bluebrain/nexus/ship/RunShip.scala | 22 +++-- .../acls/{AclOps.scala => AclWiring.scala} | 6 +- .../ship/resolvers/ResolverProcessor.scala | 7 +- ...ResolverOps.scala => ResolverWiring.scala} | 12 +-- .../ship/resources/ResourceProcessor.scala | 49 +++-------- .../nexus/ship/resources/ResourceWiring.scala | 41 +++++++++ .../nexus/ship/schemas/SchemaOps.scala | 32 ------- .../nexus/ship/schemas/SchemaProcessor.scala | 87 +++++++++++++++++++ .../nexus/ship/schemas/SchemaWiring.scala | 49 +++++++++++ 9 files changed, 218 insertions(+), 87 deletions(-) rename ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/{AclOps.scala => AclWiring.scala} (84%) rename ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/{ResolverOps.scala => ResolverWiring.scala} (79%) create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala delete mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index fcfd6fad7e..146c31f90f 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -14,8 +14,8 @@ import ch.epfl.bluebrain.nexus.ship.model.InputEvent import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor -import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor -import ch.epfl.bluebrain.nexus.ship.schemas.SchemaOps +import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring} +import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring} import fs2.Stream import fs2.io.file.{Files, Path} import io.circe.parser.decode @@ -43,11 +43,23 @@ class RunShip { _ <- orgProvider.create(config.organizations.values) events = eventStream(file) fetchActiveOrg = FetchActiveOrganization(xas) - fetchSchema <- SchemaOps.fetchSchema(config.eventLog, clock, xas) + // Wiring + schemaLog = SchemaWiring.schemaLog(config.eventLog, xas, jsonLdApi) + resourceLog = ResourceWiring.resourceLog(fetchContext, schemaLog, eventLogConfig, xas) + schemaImports = SchemaWiring.schemaImports( + resourceLog, + schemaLog, + fetchContext, + eventLogConfig, + xas + ) + // Processors projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) - resourceProcessor <- ResourceProcessor(eventLogConfig, fetchContext, fetchSchema, xas) - report <- EventProcessor.run(events, projectProcessor, resolverProcessor, resourceProcessor) + schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports) + resourceProcessor <- ResourceProcessor(resourceLog, fetchContext) + report <- EventProcessor + .run(events, projectProcessor, resolverProcessor, schemaProcessor, resourceProcessor) } yield report } } yield report diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclWiring.scala similarity index 84% rename from ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala rename to ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclWiring.scala index 2afe11dd82..967962ff5d 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclOps.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/acls/AclWiring.scala @@ -1,14 +1,14 @@ package ch.epfl.bluebrain.nexus.ship.acls import cats.effect.{Clock, IO} -import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclsImpl +import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsImpl} import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig -object AclOps { +object AclWiring { - def acls(config: EventLogConfig, clock: Clock[IO], xas: Transactors) = { + def acls(config: EventLogConfig, clock: Clock[IO], xas: Transactors): Acls = { val permissionSet = Set(Permission.unsafe("resources/read")) AclsImpl( IO.pure(permissionSet), diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala index c50b9c0483..7b0e261d90 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.ship.resolvers import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext @@ -17,7 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Identity} import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor.logger -import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus} +import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus} import io.circe.Decoder class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extends EventProcessor[ResolverEvent] { @@ -80,9 +79,7 @@ object ResolverProcessor { xas: Transactors )(implicit api: JsonLdApi): IO[ResolverProcessor] = EventClock.init().map { clock => - implicit val uuidF: UUIDF = FailingUUID - - val resolvers = ResolverOps.resolvers(fetchContext, config, clock, xas) + val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas) new ResolverProcessor(resolvers, clock) } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverWiring.scala similarity index 79% rename from ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala rename to ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverWiring.scala index 819bd17792..f04f9813eb 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverOps.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverWiring.scala @@ -5,16 +5,17 @@ import cats.effect.kernel.Clock import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext -import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, ResolversImpl} +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, Resolvers, ResolversImpl} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.ship.FailingUUID -object ResolverOps { +object ResolverWiring { def resolvers(fetchContext: FetchContext, config: EventLogConfig, clock: Clock[IO], xas: Transactors)(implicit - jsonLdApi: JsonLdApi, - uuidF: UUIDF - ) = + jsonLdApi: JsonLdApi + ): Resolvers = { + implicit val uuidF: UUIDF = FailingUUID ResolversImpl( fetchContext, // We rely on the parsed values and not on the original value @@ -23,5 +24,6 @@ object ResolverOps { xas, clock ) + } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala index c2b1bd11e2..1514c4b680 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceProcessor.scala @@ -1,29 +1,22 @@ package ch.epfl.bluebrain.nexus.ship.resources -import cats.effect.IO +import cats.effect.{Clock, IO} import cats.implicits.catsSyntaxOptionId import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution -import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.model.{IdSegment, IdSegmentRef} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext -import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, ResourceResolution} +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog import ch.epfl.bluebrain.nexus.delta.sdk.resources._ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceEvent import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceEvent._ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{IncorrectRev, ResourceAlreadyExists} -import ch.epfl.bluebrain.nexus.delta.sdk.schemas.FetchSchema -import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ResourceRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} -import ch.epfl.bluebrain.nexus.ship.acls.AclOps -import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverOps import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor.logger -import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus} +import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus} import io.circe.Decoder class ResourceProcessor private (resources: Resources, clock: EventClock) extends EventProcessor[ResourceEvent] { @@ -68,7 +61,7 @@ class ResourceProcessor private (resources: Resources, clock: EventClock) extend }.redeemWith( { case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped) - case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided").as(ImportStatus.Dropped) + case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped) case other => IO.raiseError(other) }, _ => IO.pure(ImportStatus.Success) @@ -81,32 +74,14 @@ object ResourceProcessor { private val logger = Logger[ResourceProcessor] def apply( - config: EventLogConfig, - fetchContext: FetchContext, - fetchSchema: FetchSchema, - xas: Transactors + log: Clock[IO] => IO[ResourceLog], + fetchContext: FetchContext )(implicit jsonLdApi: JsonLdApi): IO[ResourceProcessor] = - EventClock.init().map { clock => - implicit val uuidF: UUIDF = FailingUUID - - val detectChange = DetectChange(false) - - val aclCheck = AclCheck(AclOps.acls(config, clock, xas)) - val resolvers = ResolverOps.resolvers(fetchContext, config, clock, xas) - val resourceResolution = - ResourceResolution.schemaResource(aclCheck, resolvers, fetchSchema, excludeDeprecated = false) - - val validate = ValidateResource(resourceResolution)(RemoteContextResolution.never) - - val resourceDef = Resources.definition(validate, detectChange, clock) - val resourceLog = ScopedEventLog(resourceDef, config, xas) - - val resources = ResourcesImpl( - resourceLog, - fetchContext, - ResolverContextResolution.never - ) - new ResourceProcessor(resources, clock) + EventClock.init().flatMap { clock => + for { + resourceLog <- log(clock) + resources = ResourcesImpl(resourceLog, fetchContext, ResolverContextResolution.never) + } yield new ResourceProcessor(resources, clock) } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala new file mode 100644 index 0000000000..f574c2b286 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala @@ -0,0 +1,41 @@ +package ch.epfl.bluebrain.nexus.ship.resources + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResourceResolution +import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog +import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, Resources, ValidateResource} +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.FetchSchema +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.ship.acls.AclWiring +import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring + +object ResourceWiring { + + def resourceLog( + fetchContext: FetchContext, + schemaLog: Clock[IO] => IO[SchemaLog], + config: EventLogConfig, + xas: Transactors + )(implicit + jsonLdApi: JsonLdApi + ): Clock[IO] => IO[ResourceLog] = { clock => + val detectChange = DetectChange(false) + val aclCheck = AclCheck(AclWiring.acls(config, clock, xas)) + val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas) + + for { + fetchSchema <- schemaLog(clock).map(FetchSchema(_)) + resourceResolution = + ResourceResolution.schemaResource(aclCheck, resolvers, fetchSchema, excludeDeprecated = false) + validate = ValidateResource(resourceResolution)(RemoteContextResolution.never) + resourceDef = Resources.definition(validate, detectChange, clock) + } yield ScopedEventLog(resourceDef, config, xas) + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala deleted file mode 100644 index 83a8bfe6b2..0000000000 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaOps.scala +++ /dev/null @@ -1,32 +0,0 @@ -package ch.epfl.bluebrain.nexus.ship.schemas - -import cats.effect.{Clock, IO} -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution -import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ShaclShapesGraph -import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog -import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{FetchSchema, Schemas, ValidateSchema} -import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig - -object SchemaOps { - - def validateSchema(implicit api: JsonLdApi): IO[ValidateSchema] = { - val rcr = RemoteContextResolution.never - ShaclShapesGraph.shaclShaclShapes.map(ValidateSchema(api, _, rcr)) - } - - def schemaLog(config: EventLogConfig, clock: Clock[IO], xas: Transactors)(implicit api: JsonLdApi): IO[SchemaLog] = - for { - validate <- validateSchema - schemaDef = Schemas.definition(validate, clock) - } yield ScopedEventLog(schemaDef, config, xas) - - def fetchSchema(config: EventLogConfig, clock: Clock[IO], xas: Transactors)(implicit - api: JsonLdApi - ): IO[FetchSchema] = - for { - log <- schemaLog(config, clock, xas) - } yield FetchSchema(log) - -} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala new file mode 100644 index 0000000000..6eed701711 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala @@ -0,0 +1,87 @@ +package ch.epfl.bluebrain.nexus.ship.schemas + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.SchemaEvent +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.SchemaRejection.{IncorrectRev, ResourceAlreadyExists} +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{SchemaImports, Schemas, SchemasImpl} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject +import ch.epfl.bluebrain.nexus.ship.schemas.SchemaProcessor.logger +import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus} +import io.circe.Decoder + +class SchemaProcessor private (schemas: Schemas, clock: EventClock) extends EventProcessor[SchemaEvent] { + + override def resourceType: EntityType = Schemas.entityType + + override def decoder: Decoder[SchemaEvent] = SchemaEvent.serializer.codec + + override def evaluate(event: SchemaEvent): IO[ImportStatus] = { + for { + _ <- clock.setInstant(event.instant) + result <- evaluateInternal(event) + } yield result + } + + // TODO: Provide a correct implementation + private def evaluateInternal(event: SchemaEvent): IO[ImportStatus] = { + implicit val s: Subject = event.subject + implicit val c: Caller = Caller(s, Set.empty) + + val id = event.id + val projectRef = event.project + val cRev = event.rev - 1 + event match { + case SchemaEvent.SchemaCreated(_, _, value, _, _, _, _, _) => + schemas.create(id, projectRef, value) + case SchemaEvent.SchemaUpdated(_, _, value, _, _, _, _, _) => + schemas.update(id, projectRef, cRev, value) + case SchemaEvent.SchemaRefreshed(_, _, _, _, _, _, _) => + // Refreshed events are not supported + IO.unit + case SchemaEvent.SchemaTagDeleted(_, _, _, _, _, _) => + // Tags have been removed + IO.unit + case _: SchemaEvent.SchemaTagAdded => + // Tags have been removed + IO.unit + case _: SchemaEvent.SchemaDeprecated => + schemas.deprecate(id, projectRef, cRev) + case _: SchemaEvent.SchemaUndeprecated => + schemas.undeprecate(id, projectRef, cRev) + } + }.redeemWith( + { + case a: ResourceAlreadyExists => logger.warn(a)("The schema already exists").as(ImportStatus.Dropped) + case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped) + case other => IO.raiseError(other) + }, + _ => IO.pure(ImportStatus.Success) + ) + +} + +object SchemaProcessor { + + private val logger = Logger[SchemaProcessor] + + def apply( + log: Clock[IO] => IO[SchemaLog], + fetchContext: FetchContext, + schemaImports: Clock[IO] => IO[SchemaImports] + )(implicit jsonLdApi: JsonLdApi): IO[SchemaProcessor] = EventClock.init().flatMap { clock => + val rcr = ResolverContextResolution.never + for { + schemaLog <- log(clock) + imports <- schemaImports(clock) + schemas = SchemasImpl(schemaLog, fetchContext, imports, rcr)(jsonLdApi, FailingUUID) + } yield new SchemaProcessor(schemas, clock) + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala new file mode 100644 index 0000000000..2f361b38b5 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala @@ -0,0 +1,49 @@ +package ch.epfl.bluebrain.nexus.ship.schemas + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution +import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ShaclShapesGraph +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource +import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{FetchSchema, SchemaImports, Schemas, ValidateSchema} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.ship.acls.AclWiring +import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring + +object SchemaWiring { + + def schemaImports( + resourceLog: Clock[IO] => IO[ResourceLog], + schemaLog: Clock[IO] => IO[SchemaLog], + fetchContext: FetchContext, + config: EventLogConfig, + xas: Transactors + )(implicit + jsonLdApi: JsonLdApi + ): Clock[IO] => IO[SchemaImports] = { clock => + val aclCheck = AclCheck(AclWiring.acls(config, clock, xas)) + val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas) + for { + fetchResource <- resourceLog(clock).map(FetchResource(_)) + fetchSchema <- schemaLog(clock).map(FetchSchema(_)) + } yield SchemaImports(aclCheck, resolvers, fetchSchema, fetchResource) + } + + private def validateSchema(implicit api: JsonLdApi): IO[ValidateSchema] = { + val rcr = RemoteContextResolution.never + ShaclShapesGraph.shaclShaclShapes.map(ValidateSchema(api, _, rcr)) + } + + def schemaLog(config: EventLogConfig, xas: Transactors, api: JsonLdApi): Clock[IO] => IO[SchemaLog] = + clock => + for { + validate <- validateSchema(api) + schemaDef = Schemas.definition(validate, clock) + } yield ScopedEventLog(schemaDef, config, xas) + +} From b36db37453f506c20fbd76dc2c4f533621132928 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Fri, 22 Mar 2024 14:58:43 +0100 Subject: [PATCH 24/24] Wire initial remote context / resolver context resolution for schemas to work --- .../bluebrain/nexus/ship/ContextWiring.scala | 46 +++++++++++++++++++ .../epfl/bluebrain/nexus/ship/RunShip.scala | 3 +- .../nexus/ship/schemas/SchemaProcessor.scala | 5 +- .../nexus/ship/schemas/SchemaWiring.scala | 11 +++-- .../bluebrain/nexus/ship/EndToEndTest.scala | 44 ++++++++++++++++++ 5 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ContextWiring.scala diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ContextWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ContextWiring.scala new file mode 100644 index 0000000000..12b36e86c2 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ContextWiring.scala @@ -0,0 +1,46 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource +import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import ch.epfl.bluebrain.nexus.ship.acls.AclWiring +import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring + +object ContextWiring { + + implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass) + + def remoteContextResolution: IO[RemoteContextResolution] = + for { + shaclCtx <- ContextValue.fromFile("contexts/shacl.json") + schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json") + } yield RemoteContextResolution.fixed( + contexts.shacl -> shaclCtx, + contexts.schemasMetadata -> schemasMetaCtx + ) + + def resolverContextResolution( + resourceLog: Clock[IO] => IO[ResourceLog], + fetchContext: FetchContext, + config: EventLogConfig, + xas: Transactors + )(implicit jsonLdApi: JsonLdApi): Clock[IO] => IO[ResolverContextResolution] = { clock => + val aclCheck = AclCheck(AclWiring.acls(config, clock, xas)) + val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas) + + for { + fetchResource <- resourceLog(clock).map(FetchResource(_)) + rcr <- remoteContextResolution + } yield ResolverContextResolution(aclCheck, resolvers, rcr, fetchResource) + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index 146c31f90f..51bf10c80f 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -53,10 +53,11 @@ class RunShip { eventLogConfig, xas ) + rcr = ContextWiring.resolverContextResolution(resourceLog, fetchContext, eventLogConfig, xas) // Processors projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) - schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports) + schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr) resourceProcessor <- ResourceProcessor(resourceLog, fetchContext) report <- EventProcessor .run(events, projectProcessor, resolverProcessor, schemaProcessor, resourceProcessor) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala index 6eed701711..aaa119db73 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaProcessor.scala @@ -74,10 +74,11 @@ object SchemaProcessor { def apply( log: Clock[IO] => IO[SchemaLog], fetchContext: FetchContext, - schemaImports: Clock[IO] => IO[SchemaImports] + schemaImports: Clock[IO] => IO[SchemaImports], + resolverContextResolution: Clock[IO] => IO[ResolverContextResolution] )(implicit jsonLdApi: JsonLdApi): IO[SchemaProcessor] = EventClock.init().flatMap { clock => - val rcr = ResolverContextResolution.never for { + rcr <- resolverContextResolution(clock) schemaLog <- log(clock) imports <- schemaImports(clock) schemas = SchemasImpl(schemaLog, fetchContext, imports, rcr)(jsonLdApi, FailingUUID) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala index 2f361b38b5..26017c0bd2 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/schemas/SchemaWiring.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.ship.schemas import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ShaclShapesGraph import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext @@ -12,6 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{FetchSchema, SchemaImports, Schemas, ValidateSchema} import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} +import ch.epfl.bluebrain.nexus.ship.ContextWiring import ch.epfl.bluebrain.nexus.ship.acls.AclWiring import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring @@ -34,10 +34,11 @@ object SchemaWiring { } yield SchemaImports(aclCheck, resolvers, fetchSchema, fetchResource) } - private def validateSchema(implicit api: JsonLdApi): IO[ValidateSchema] = { - val rcr = RemoteContextResolution.never - ShaclShapesGraph.shaclShaclShapes.map(ValidateSchema(api, _, rcr)) - } + private def validateSchema(implicit api: JsonLdApi): IO[ValidateSchema] = + for { + rcr <- ContextWiring.remoteContextResolution + shapesGraph <- ShaclShapesGraph.shaclShaclShapes + } yield ValidateSchema(api, shapesGraph, rcr) def schemaLog(config: EventLogConfig, xas: Transactors, api: JsonLdApi): Clock[IO] => IO[SchemaLog] = clock => diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala index 1b0f033af5..90cf49733c 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/EndToEndTest.scala @@ -73,6 +73,19 @@ class EndToEndTest extends BaseIntegrationSpec { thereShouldBeAResource(project, resource, resourceJson) } + "transfer a schema" in { + val (project, _) = thereIsAProject() + val (schema, schemaJson) = thereIsASchema(project) + + whenTheExportIsRunOnProject(project) + theOldProjectIsDeleted(project) + + weRunTheImporter(project) + weFixThePermissions(project) + + thereShouldBeASchema(project, schema, schemaJson) + } + def thereIsAProject(): (ProjectRef, Json) = { val project: ProjectRef = ProjectRef.unsafe(genString(), genString()) createProjects(writer, project.organization.value, project.project.value).accepted @@ -187,6 +200,37 @@ class EndToEndTest extends BaseIntegrationSpec { } .accepted } + + def thereIsASchema(project: ProjectRef): (Iri, Json) = { + val schema = nxv + genString() + val encodedSchema = UrlUtils.encode(schema.toString) + // TODO: Review the json of the simpleSchema + val simpleSchema = + json"""{"shapes":[{"@id":"http://example.com/MyShape","@type":"http://www.w3.org/ns/shacl#NodeShape","nodeKind":"http://www.w3.org/ns/shacl#BlankNodeOrIRI","targetClass":"http://example.com/Custom","property":[{"path":"http://example.com/name","datatype":"http://www.w3.org/2001/XMLSchema#string","minCount":1}]}]}""" + deltaClient + .put[Json](s"/schemas/${project.organization}/${project.project}/$encodedSchema", simpleSchema, writer) { + (_, response) => + response.status shouldEqual StatusCodes.Created + } + .accepted + val (resourceJson, status) = deltaClient + .getJsonAndStatus(s"/schemas/${project.organization}/${project.project}/$encodedSchema", writer) + .accepted + status shouldEqual StatusCodes.OK + schema -> resourceJson + } + + def thereShouldBeASchema(project: ProjectRef, schema: Iri, originalJson: Json): Assertion = { + val encodedIri = UrlUtils.encode(schema.toString) + deltaClient + .get[Json](s"/schemas/${project.organization}/${project.project}/$encodedIri", writer) { (json, response) => + { + response.status shouldEqual StatusCodes.OK + json shouldEqual originalJson + } + } + .accepted + } } }