Skip to content

Commit

Permalink
Keep track of the passivation duration and log it (#5215)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Nov 1, 2024
1 parent 733dad3 commit 3e2206a
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.Scope
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig.{DelayConfig, PassivationConfig, StopConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshOrStop.Outcome
import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshOrStop.Outcome._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectActivitySignals
import fs2.Stream
import fs2.concurrent.SignallingRef

import concurrent.duration._

/**
* Computes the outcome to apply when all elements are consumed by a projection
Expand Down Expand Up @@ -54,12 +58,7 @@ object RefreshOrStop {
case true =>
logger.debug(s"Project '$project' is active, continue after ${w.delay}") >>
IO.sleep(w.delay).as(DelayedPassivation)
case false =>
logger.info(s"Project '$project' is inactive, pausing until some activity is seen again.") >>
Stream.never[IO].interruptWhen(signal).compile.drain >>
logger
.info(s"An update has been detected on project '$project', querying will resume.")
.as(Passivated)
case false => passivate(project, signal)
}
case None =>
logger.debug(s"No signal has been found for project '$project', continue after ${w.delay}") >> IO
Expand All @@ -73,4 +72,12 @@ object RefreshOrStop {
}
}

private def passivate(project: ProjectRef, signal: SignallingRef[IO, Boolean]) =
for {
_ <- logger.info(s"Project '$project' is inactive, pausing until some activity is seen again.")
durationOpt <- Stream.awakeEvery[IO](1.second).scanMonoid.interruptWhen(signal).compile.last
minutes = durationOpt.getOrElse(0.minute).toMinutes
_ <- logger.info(s"Project '$project' is active again after `$minutes minutes`, querying will resume.")
} yield Passivated

}

0 comments on commit 3e2206a

Please sign in to comment.