Skip to content

Commit

Permalink
chore: Remove SupervisedGraphStageLogic (#1619)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Dec 22, 2024
1 parent 65fc751 commit 8bb6e22
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoTo")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoToContext")
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.wireTapContext")
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Remove SupervisedGraphStageLogic
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.fusing.SupervisedGraphStageLogic")
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.Exception.Catcher

import org.apache.pekko
import pekko.actor.{ ActorRef, Terminated }
import pekko.annotation.{ DoNotInherit, InternalApi }
import pekko.annotation.InternalApi
import pekko.event._
import pekko.event.Logging.LogLevel
import pekko.stream.{ Supervision, _ }
Expand Down Expand Up @@ -206,33 +206,6 @@ import pekko.util.ccompat._
override def toString = "DropWhile"
}

/**
* INTERNAL API
*/
@DoNotInherit private[pekko] abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape)
extends GraphStageLogic(shape) {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

def withSupervision[T](f: () => T): Option[T] =
try {
Some(f())
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => onStop(ex)
case Supervision.Resume => onResume(ex)
case Supervision.Restart => onRestart(ex)
}
None
}

def onResume(t: Throwable): Unit

def onStop(t: Throwable): Unit = failStage(t)

def onRestart(t: Throwable): Unit = onResume(t)
}

private[stream] object Collect {
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
Expand Down

0 comments on commit 8bb6e22

Please sign in to comment.