Skip to content

Commit

Permalink
Fix config loading in nexus ship (#4900)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 24, 2024
1 parent cfccd07 commit 4f3b018
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
27 changes: 13 additions & 14 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@ object InitShip {

def configAndStream(
run: RunCommand,
defaultConfig: ShipConfig,
s3Client: S3StorageClient
): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = {
ShipConfig.load(run.config).flatMap { shipConfig =>
run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map((_, eventsStream))
case RunMode.S3 =>
val eventsStream =
EventStreamer.s3eventStreamer(s3Client, shipConfig.s3.importBucket).stream(run.path, run.offset)
val config = run.config match {
case Some(configPath) => ShipConfig.loadFromS3(s3Client, shipConfig.s3.importBucket, configPath)
case None => IO.pure(shipConfig)
}
config.map((_, eventsStream))
}
run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map((_, eventsStream))
case RunMode.S3 =>
val eventsStream =
EventStreamer.s3eventStreamer(s3Client, defaultConfig.s3.importBucket).stream(run.path, run.offset)
val config = run.config match {
case Some(configPath) => ShipConfig.loadFromS3(s3Client, defaultConfig.s3.importBucket, configPath)
case None => IO.pure(defaultConfig)
}
config.map((_, eventsStream))
}
}
}
8 changes: 4 additions & 4 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ object Main
val clock = Clock[IO]

val resources = for {
initialConfig <- Resource.eval(ShipConfig.load(r.config))
client <- S3StorageClient.resource(initialConfig.s3.endpoint, DefaultCredentialsProvider.create())
xas <- Transactors.init(initialConfig.database)
(config, eventsStream) <- Resource.eval(InitShip.configAndStream(r, client))
defaultConfig <- Resource.eval(ShipConfig.load(None))
client <- S3StorageClient.resource(defaultConfig.s3.endpoint, DefaultCredentialsProvider.create())
xas <- Transactors.init(defaultConfig.database)
(config, eventsStream) <- Resource.eval(InitShip.configAndStream(r, defaultConfig, client))
} yield (client, config, eventsStream, xas)

resources.use { case (client, config, eventsStream, xas) =>
Expand Down

0 comments on commit 4f3b018

Please sign in to comment.