Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResourceProcessor, SchemaProcessor; add E2E tests for ship #4803

Merged
merged 24 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .github/workflows/ci-delta-ship.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,27 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create /tmp/ship folder
run: mkdir -p /tmp/ship
- name: Setup JDK
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '21'
cache: 'sbt'
check-latest: true
- name: Clean, build Delta & Storage images
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
app/Docker/publishLocal
- name: Start services
run: docker-compose -f tests/docker/docker-compose.yml up -d
- name: Waiting for Delta to start
run: |
URL="http://localhost:8080/v1/version"
curl --connect-timeout 3 --max-time 5 --retry 30 --retry-all-errors --retry-delay 3 --retry-max-time 90 $URL
- name: Unit tests
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
clean \
ship-unit-tests-with-coverage
ship-unit-tests
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ lazy val ship = project
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(sdk % "compile->compile;test->test", testkit % "test->compile")
.dependsOn(sdk % "compile->compile;test->test", tests % "test->test")
.settings(
libraryDependencies ++= Seq(declineEffect),
addCompilerPlugin(betterMonadicFor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object AclsModule extends ModuleDef {
permissions.fetchPermissionSet,
AclsImpl.findUnknownRealms(xas),
permissions.minimum,
config.acls,
config.acls.eventLog,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Organization, Project, Root}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsConfig, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
Expand Down Expand Up @@ -90,7 +90,7 @@ class AclsRoutesSpec extends BaseRouteSpec {
IO.pure(Set(aclsPermissions.read, aclsPermissions.write, managePermission, events.read)),
Acls.findUnknownRealms(_, Set(realm1, realm2)),
Set(aclsPermissions.read, aclsPermissions.write, managePermission, events.read),
AclsConfig(EventLogConfig(QueryConfig(5, RefreshStrategy.Stop), 100.millis)),
EventLogConfig(QueryConfig(5, RefreshStrategy.Stop), 100.millis),
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sourcing._
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore
Expand Down Expand Up @@ -108,12 +109,12 @@ object AclsImpl {
fetchPermissionSet: IO[Set[Permission]],
findUnknownRealms: Set[Label] => IO[Unit],
minimum: Set[Permission],
config: AclsConfig,
config: EventLogConfig,
xas: Transactors,
clock: Clock[IO]
): Acls =
new AclsImpl(
GlobalEventLog(Acls.definition(fetchPermissionSet, findUnknownRealms, clock), config.eventLog, xas),
GlobalEventLog(Acls.definition(fetchPermissionSet, findUnknownRealms, clock), config, xas),
minimum
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class AclsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with Cance
IO.pure(minimumPermissions),
Acls.findUnknownRealms(_, Set(realm, realm2)),
minimumPermissions,
AclsConfig(eventLogConfig),
eventLogConfig,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OrganizationDeleterSuite extends NexusSuite with ConfigFixtures with Proje
private val fields = ProjectFields(None, ApiMappings.empty, None, None)
private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, eventLogConfig, xas, clock)
private val permission = Permissions.resources.read
private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig, xas, clock)
private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig.eventLog, xas, clock)

implicit val subject: Subject = Identity.User("Bob", Label.unsafe("realm"))
implicit val uuidF: UUIDF = UUIDF.fixed(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.projects
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsConfig, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.generators.{OrganizationGen, PermissionsGen, ProjectGen}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.{Caller, ServiceAccount}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
Expand All @@ -22,7 +22,7 @@ class OwnerPermissionsScopeInitializationSpec extends CatsEffectSpec with Doobie
IO.pure(PermissionsGen.minimum),
Acls.findUnknownRealms(_, Set(saRealm, usersRealm)),
PermissionsGen.minimum,
AclsConfig(eventLogConfig),
eventLogConfig,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ trait CatsIOValues {

implicit final class CatsIOValuesOps[A](private val io: IO[A]) {
def accepted(implicit pos: source.Position): A = {
io.attempt.unsafeRunTimed(45.seconds).getOrElse(fail("IO timed out during .accepted call")) match {
case Left(e) => fail(s"IO failed when it was expected to succeed $e.", e)
case Right(value) => value
}
io.unsafeRunTimed(45.seconds).getOrElse(fail("IO timed out during .accepted call"))
}

def rejected(implicit pos: source.Position): Throwable = rejectedWith[Throwable]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.ship.acls.AclWiring
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring

object ContextWiring {

implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

def remoteContextResolution: IO[RemoteContextResolution] =
for {
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
} yield RemoteContextResolution.fixed(
contexts.shacl -> shaclCtx,
contexts.schemasMetadata -> schemasMetaCtx
)

def resolverContextResolution(
resourceLog: Clock[IO] => IO[ResourceLog],
fetchContext: FetchContext,
config: EventLogConfig,
xas: Transactors
)(implicit jsonLdApi: JsonLdApi): Clock[IO] => IO[ResolverContextResolution] = { clock =>
val aclCheck = AclCheck(AclWiring.acls(config, clock, xas))
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)

for {
fetchResource <- resourceLog(clock).map(FetchResource(_))
rcr <- remoteContextResolution
} yield ResolverContextResolution(aclCheck, resolvers, rcr, fetchResource)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class EventClock(instant: Ref[IO, Instant]) extends Clock[IO] {
override def realTime: IO[FiniteDuration] = toDuration

private def toDuration: IO[FiniteDuration] = instant.get.map { i =>
FiniteDuration(i.toEpochMilli, TimeUnit.MILLISECONDS)
val seconds = FiniteDuration(i.getEpochSecond, TimeUnit.SECONDS)
val nanos = FiniteDuration(i.getNano, TimeUnit.NANOSECONDS)
seconds + nanos
}
}

Expand Down
53 changes: 3 additions & 50 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, ExitCode, IO}
import cats.effect.{ExitCode, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import com.monovore.decline.Opts
import com.monovore.decline.effect.CommandIOApp
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser._
import fs2.io.file.Path

object Main
extends CommandIOApp(
Expand Down Expand Up @@ -54,52 +41,18 @@ object Main
override def main: Opts[IO[ExitCode]] =
(run orElse showConfig)
.map {
case Run(file, config, _) => run(file, config)
case Run(file, config, _) => new RunShip().run(file, config)
case ShowConfig(config) => showConfig(config)
}
.map(_.as(ExitCode.Success))

private[ship] def run(file: Path, config: Option[Path]): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
_ <- logger.info(s"Running the import with file $file, config $config and from offset $offset")
config <- ShipConfig.load(config)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
fetchActiveOrg = FetchActiveOrganization(xas)
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
report <- EventProcessor.run(events, projectProcessor, resolverProcessor)
} yield report
}
} yield report
}

private[ship] def showConfig(config: Option[Path]) =
for {
_ <- logger.info(s"Showing reconciled config")
config <- ShipConfig.merge(config).map(_._2)
_ <- logger.info(config.root().render())
} yield ()

private def eventStream(file: Path): Stream[IO, InputEvent] =
Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
}
}

sealed private trait Command

final private case class Run(file: Path, config: Option[Path], offset: Offset) extends Command
Expand Down
76 changes: 76 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser.decode

class RunShip {

private val logger = Logger[RunShip]

def run(file: Path, config: Option[Path]): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
_ <- logger.info(s"Running the import with file $file, config $config")
config <- ShipConfig.load(config)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
fetchActiveOrg = FetchActiveOrganization(xas)
// Wiring
schemaLog = SchemaWiring.schemaLog(config.eventLog, xas, jsonLdApi)
resourceLog = ResourceWiring.resourceLog(fetchContext, schemaLog, eventLogConfig, xas)
schemaImports = SchemaWiring.schemaImports(
resourceLog,
schemaLog,
fetchContext,
eventLogConfig,
xas
)
rcr = ContextWiring.resolverContextResolution(resourceLog, fetchContext, eventLogConfig, xas)
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr)
resourceProcessor <- ResourceProcessor(resourceLog, fetchContext)
report <- EventProcessor
.run(events, projectProcessor, resolverProcessor, schemaProcessor, resourceProcessor)
} yield report
}
} yield report
}

private def eventStream(file: Path): Stream[IO, InputEvent] =
Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ch.epfl.bluebrain.nexus.ship.acls

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig

object AclWiring {

def acls(config: EventLogConfig, clock: Clock[IO], xas: Transactors): Acls = {
val permissionSet = Set(Permission.unsafe("resources/read"))
AclsImpl(
IO.pure(permissionSet),
AclsImpl.findUnknownRealms(xas),
permissionSet,
config,
xas,
clock
)
}

}
Loading