Skip to content

Commit

Permalink
Remove skipped steps and startFrom
Browse files Browse the repository at this point in the history
  • Loading branch information
cquiroz committed Oct 15, 2024
1 parent 769f3a8 commit 7f0260b
Show file tree
Hide file tree
Showing 24 changed files with 53 additions and 1,255 deletions.
30 changes: 1 addition & 29 deletions modules/engine/src/main/scala/observe/engine/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,14 @@ class Engine[F[_]: MonadThrow: Logger, S, U] private (
{
putS(id)(
Sequence.State.status.replace(SequenceState.Running.Init)(
seq.skips.getOrElse(seq).rollback
seq.rollback
)
) *>
send(Event.executing(id))
}.whenA(seq.status.isIdle || seq.status.isError)
case None => unit
}

/**
* startFrom starts a sequence from an arbitrary step. It does it by marking all previous steps to
* be skipped and then modifying the state sequence as if it was run. If the requested step is
* already run or marked to be skipped, the sequence will start from the next runnable step
*/
def startFrom(id: Observation.Id, step: Step.Id): HandleType[Unit] =
getS(id).flatMap {
case Some(seq)
if (seq.status.isIdle || seq.status.isError) && seq.toSequence.steps.exists(
_.id === step
) =>
val steps = seq.toSequence.steps
.takeWhile(_.id =!= step)
.mapFilter(p => (!p.status.isFinished).option(p.id))
val withSkips = steps.foldLeft[Sequence.State[F]](seq) { case (s, i) =>
s.setSkipMark(i, v = true)
}
putS(id)(
Sequence.State.status.replace(SequenceState.Running.Init)(
withSkips.skips.getOrElse(withSkips).rollback
)
) *> send(Event.executing(id))
case _ => unit
}

def pause(id: Observation.Id): HandleType[Unit] =
modifyS(id)(Sequence.State.userStopSet(true))

Expand Down Expand Up @@ -405,9 +380,6 @@ class Engine[F[_]: MonadThrow: Logger, S, U] private (
case Breakpoints(id, _, step, v) =>
debug(s"Engine: breakpoints changed for sequence $id and step $step to $v") *>
modifyS(id)(_.setBreakpoints(step, v)) *> pure(UserCommandResponse(ue, Outcome.Ok, None))
case SkipMark(id, _, step, v) =>
debug(s"Engine: skip mark changed for sequence $id and step $step to $v") *>
modifyS(id)(_.setSkipMark(step, v)) *> pure(UserCommandResponse(ue, Outcome.Ok, None))
case Poll(_) =>
debug("Engine: Polling current state") *> pure(UserCommandResponse(ue, Outcome.Ok, None))
case GetState(f) => getState(f) *> pure(UserCommandResponse(ue, Outcome.Ok, None))
Expand Down
70 changes: 22 additions & 48 deletions modules/engine/src/main/scala/observe/engine/EngineStep.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package observe.engine
import cats.syntax.all.*
import lucuma.core.enums.Breakpoint
import lucuma.core.model.sequence.Step
import lucuma.core.util.NewType
import monocle.Focus
import monocle.Iso
import monocle.Lens
Expand All @@ -20,18 +19,11 @@ import observe.model.StepState
case class EngineStep[F[_]](
id: Step.Id,
breakpoint: Breakpoint,
skipped: EngineStep.Skipped,
skipMark: EngineStep.SkipMark,
executions: List[ParallelActions[F]]
)

object EngineStep {

object SkipMark extends NewType[Boolean]
type SkipMark = SkipMark.Type
object Skipped extends NewType[Boolean]
type Skipped = Skipped.Type

def isoBool: Iso[Breakpoint, Boolean] =
Iso[Breakpoint, Boolean](_ === Breakpoint.Enabled)(b =>
if (b) Breakpoint.Enabled else Breakpoint.Disabled
Expand All @@ -40,44 +32,34 @@ object EngineStep {
def breakpointL[F[_]]: Lens[EngineStep[F], Boolean] =
Focus[EngineStep[F]](_.breakpoint).andThen(isoBool)

def skippedL[F[_]]: Lens[EngineStep[F], Boolean] =
Focus[EngineStep[F]](_.skipped).andThen(Skipped.value)

def init[F[_]](id: Step.Id, executions: List[ParallelActions[F]]): EngineStep[F] =
EngineStep(id = id,
breakpoint = Breakpoint.Disabled,
skipped = Skipped(false),
skipMark = SkipMark(false),
executions = executions
)
EngineStep(id = id, breakpoint = Breakpoint.Disabled, executions = executions)

/**
* Calculate the `Step` `Status` based on the underlying `Action`s.
*/
private def status_[F[_]](step: EngineStep[F]): StepState =
if (step.skipped.value) StepState.Skipped
else
// Find an error in the Step
step.executions
.flatMap(_.toList)
.find(Action.errored)
.flatMap { x =>
x.state.runState match {
case ActionState.Failed(Result.Error(msg)) => msg.some
case _ => None
// Return error or continue with the rest of the checks
}
// Find an error in the Step
step.executions
.flatMap(_.toList)
.find(Action.errored)
.flatMap { x =>
x.state.runState match {
case ActionState.Failed(Result.Error(msg)) => msg.some
case _ => None
// Return error or continue with the rest of the checks
}
.map[StepState](StepState.Failed.apply)
.getOrElse(
// All actions in this Step were completed successfully, or the Step is empty.
if (step.executions.flatMap(_.toList).exists(Action.aborted)) StepState.Aborted
else if (step.executions.flatMap(_.toList).forall(Action.completed)) StepState.Completed
else if (step.executions.flatMap(_.toList).forall(_.state.runState.isIdle))
StepState.Pending
// Not all actions are completed or pending.
else StepState.Running
)
}
.map[StepState](StepState.Failed.apply)
.getOrElse(
// All actions in this Step were completed successfully, or the Step is empty.
if (step.executions.flatMap(_.toList).exists(Action.aborted)) StepState.Aborted
else if (step.executions.flatMap(_.toList).forall(Action.completed)) StepState.Completed
else if (step.executions.flatMap(_.toList).forall(_.state.runState.isIdle))
StepState.Pending
// Not all actions are completed or pending.
else StepState.Running
)

extension [F[_]](s: EngineStep[F]) {
def status: StepState = EngineStep.status_(s)
Expand All @@ -89,7 +71,6 @@ object EngineStep {
case class Zipper[F[_]](
id: Step.Id,
breakpoint: Breakpoint,
skipMark: SkipMark,
pending: List[ParallelActions[F]],
focus: Execution[F],
done: List[ParallelActions[F]],
Expand Down Expand Up @@ -121,9 +102,7 @@ object EngineStep {
*/
val uncurrentify: Option[EngineStep[F]] =
if (pending.isEmpty)
focus.uncurrentify.map(x =>
EngineStep(id, breakpoint, Skipped(false), skipMark, x.prepend(done))
)
focus.uncurrentify.map(x => EngineStep(id, breakpoint, x.prepend(done)))
else None

/**
Expand All @@ -134,13 +113,9 @@ object EngineStep {
EngineStep(
id = id,
breakpoint = breakpoint,
skipped = Skipped(false),
skipMark = skipMark,
executions = done ++ focus.toParallelActionsList ++ pending
)

val skip: EngineStep[F] = toStep.copy(skipped = Skipped(true))

def update(executions: List[ParallelActions[F]]): Zipper[F] =
Zipper
.calcRolledback(executions)
Expand Down Expand Up @@ -175,7 +150,6 @@ object EngineStep {
Zipper(
step.id,
step.breakpoint,
step.skipMark,
exes,
x,
Nil,
Expand Down
6 changes: 0 additions & 6 deletions modules/engine/src/main/scala/observe/engine/Event.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ object Event {
steps: List[Step.Id],
v: Breakpoint
): Event[F, S, U] = EventUser[F, S, U](Breakpoints(id, user.some, steps, v))
def skip[F[_], S, U](
id: Observation.Id,
user: User,
step: Step.Id,
v: Boolean
): Event[F, S, U] = EventUser[F, S, U](SkipMark(id, user.some, step, v))
def poll[F[_], S, U](clientId: ClientId): Event[F, S, U] =
EventUser[F, S, U](Poll(clientId))
def getState[F[_], S, U](f: S => Option[Stream[F, Event[F, S, U]]]): Event[F, S, U] =
Expand Down
69 changes: 5 additions & 64 deletions modules/engine/src/main/scala/observe/engine/Sequence.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ object Sequence {
done: List[EngineStep[F]]
) {

private val (toSkip, remaining): (List[EngineStep[F]], List[EngineStep[F]]) =
pending.span(st => st.skipMark.value)

/**
* Runs the next execution. If the current `Step` is completed it adds the `StepZ` under focus
* to the list of completed `Step`s and makes the next pending `Step` the current one.
Expand All @@ -59,9 +56,7 @@ object Sequence {
focus.next match {
// Step completed
case None =>
val (toSkip, remaining): (List[EngineStep[F]], List[EngineStep[F]]) =
pending.span(st => st.skipMark.value && !(st.breakpoint === Breakpoint.Enabled))
remaining match {
pending match {
case Nil => None
case stepp :: stepps =>
(EngineStep.Zipper.currentify(stepp), focus.uncurrentify).mapN((curr, stepd) =>
Expand All @@ -70,7 +65,7 @@ object Sequence {
atomId,
stepps,
curr,
(done :+ stepd) ::: toSkip.map(_.copy(skipped = EngineStep.Skipped(true)))
done :+ stepd
)
)
}
Expand All @@ -80,42 +75,13 @@ object Sequence {

def rollback: Zipper[F] = this.copy(focus = focus.rollback)

// Skips steps before starting a sequence.
def skips: Option[Zipper[F]] =
if (focus.skipMark.value) {
remaining match {
case Nil => None
case stepp :: stepps =>
(EngineStep.Zipper.currentify(stepp), focus.skip.some).mapN((curr, stepd) =>
Zipper(
id,
atomId,
stepps,
curr,
(done :+ stepd) ::: toSkip.map(_.copy(skipped = EngineStep.Skipped(true)))
)
)
}
} else this.some

/**
* Obtain the resulting `Sequence` only if all `Step`s have been completed. This is a special
* way of *unzipping* a `Zipper`.
*/
val uncurrentify: Option[Sequence[F]] =
if (remaining.isEmpty)
if (focus.skipMark.value)
Sequence(id,
atomId,
(done :+ focus.skip) ::: toSkip.map(_.copy(skipped = EngineStep.Skipped(true)))
).some
else
focus.uncurrentify.map(x =>
Sequence(id,
atomId,
(done :+ x) ::: toSkip.map(_.copy(skipped = EngineStep.Skipped(true)))
)
)
if (pending.isEmpty)
focus.uncurrentify.map(x => Sequence(id, atomId, done :+ x))
else None

/**
Expand Down Expand Up @@ -195,12 +161,8 @@ object Sequence {

def rollback: State[F]

def skips: Option[State[F]]

def setBreakpoints(stepId: List[Step.Id], v: Breakpoint): State[F]

def setSkipMark(stepId: Step.Id, v: Boolean): State[F]

def getCurrentBreakpoint: Boolean

/**
Expand Down Expand Up @@ -307,7 +269,7 @@ object Sequence {
else {
val oldSeq = st.toSequence
val updSteps = oldSeq.steps.zip(steps).map { case (o, n) =>
n.copy(breakpoint = o.breakpoint, skipMark = o.skipMark)
n.copy(breakpoint = o.breakpoint)
} ++ steps.drop(oldSeq.steps.length)
init(oldSeq.copy(steps = updSteps))
}
Expand Down Expand Up @@ -344,30 +306,13 @@ object Sequence {

override def rollback: Zipper[F] = self.copy(zipper = zipper.rollback)

override def skips: Option[State[F]] = zipper.skips match {
// Last execution
case None => zipper.uncurrentify.map(Final[F](_, status))
case Some(x) => Zipper(x, status, singleRuns).some
}

override def setBreakpoints(stepId: List[Step.Id], v: Breakpoint): State[F] =
self.copy(zipper =
zipper.copy(pending =
zipper.pending.map(s => if (stepId.exists(_ === s.id)) s.copy(breakpoint = v) else s)
)
)

override def setSkipMark(stepId: Step.Id, v: Boolean): State[F] = self.copy(zipper =
if (zipper.focus.id == stepId)
zipper.copy(focus = zipper.focus.copy(skipMark = EngineStep.SkipMark(v)))
else
zipper.copy(pending =
zipper.pending.map(s =>
if (s.id == stepId) s.copy(skipMark = EngineStep.SkipMark(v)) else s
)
)
)

override def getCurrentBreakpoint: Boolean =
(zipper.focus.breakpoint === Breakpoint.Enabled) && zipper.focus.done.isEmpty

Expand Down Expand Up @@ -458,12 +403,8 @@ object Sequence {

override def rollback: Final[F] = self

override def skips: Option[State[F]] = self.some

override def setBreakpoints(stepId: List[Step.Id], v: Breakpoint): State[F] = self

override def setSkipMark(stepId: Step.Id, v: Boolean): State[F] = self

override def getCurrentBreakpoint: Boolean = false

override val done: List[EngineStep[F]] = seq.steps
Expand Down
6 changes: 0 additions & 6 deletions modules/engine/src/main/scala/observe/engine/UserEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ object UserEvent {
steps: List[Step.Id],
v: Breakpoint
) extends UserEvent[F, S, U]
case class SkipMark[F[_], S, U](
id: Observation.Id,
user: Option[User],
step: Step.Id,
v: Boolean
) extends UserEvent[F, S, U]
case class Poll[F[_], S, U](clientId: ClientId) extends UserEvent[F, S, U] {
val user: Option[User] = None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ class SequenceSuite extends munit.CatsEffectSuite {
EngineStep.Zipper(
id = stepId(1),
breakpoint = Breakpoint.Disabled,
EngineStep.SkipMark(false),
pending = pending,
focus = focus,
done = done.map(_.map { r =>
Expand Down
Loading

0 comments on commit 7f0260b

Please sign in to comment.