diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala index cd5dcf433c..5c62e0857b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala @@ -5,10 +5,14 @@ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.Scope import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig.{DelayConfig, PassivationConfig, StopConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshOrStop.Outcome import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshOrStop.Outcome._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectActivitySignals import fs2.Stream +import fs2.concurrent.SignallingRef + +import concurrent.duration._ /** * Computes the outcome to apply when all elements are consumed by a projection @@ -54,12 +58,7 @@ object RefreshOrStop { case true => logger.debug(s"Project '$project' is active, continue after ${w.delay}") >> IO.sleep(w.delay).as(DelayedPassivation) - case false => - logger.info(s"Project '$project' is inactive, pausing until some activity is seen again.") >> - Stream.never[IO].interruptWhen(signal).compile.drain >> - logger - .info(s"An update has been detected on project '$project', querying will resume.") - .as(Passivated) + case false => passivate(project, signal) } case None => logger.debug(s"No signal has been found for project '$project', continue after ${w.delay}") >> IO @@ -73,4 +72,12 @@ object RefreshOrStop { } } + private def passivate(project: ProjectRef, signal: SignallingRef[IO, Boolean]) = + for { + _ <- logger.info(s"Project '$project' is inactive, pausing until some activity is seen again.") + durationOpt <- Stream.awakeEvery[IO](1.second).scanMonoid.interruptWhen(signal).compile.last + minutes = durationOpt.getOrElse(0.minute).toMinutes + _ <- logger.info(s"Project '$project' is active again after `$minutes minutes`, querying will resume.") + } yield Passivated + }