From 387c56b9ad0a1673520e2bd40f18faa955817c44 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 11 Jan 2025 00:44:47 +0800 Subject: [PATCH] chore: use Hub --- .../pekko/stream/impl/ActorProcessor.scala | 36 +--- .../pekko/stream/impl/FanoutProcessor.scala | 183 ------------------ .../org/apache/pekko/stream/impl/Sinks.scala | 18 -- .../impl/fusing/FanoutPublisherSink.scala | 162 ++++++++++++++++ .../apache/pekko/stream/scaladsl/Sink.scala | 6 +- 5 files changed, 165 insertions(+), 240 deletions(-) delete mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FanoutPublisherSink.scala diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala index 486db489eda..0585fcb27ca 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala @@ -22,41 +22,7 @@ import pekko.stream.ActorAttributes import pekko.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, OnSubscribe } import pekko.util.unused -import org.reactivestreams.{ Processor, Subscriber, Subscription } - -/** - * INTERNAL API - */ -@InternalApi private[pekko] object ActorProcessor { - - def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { - val p = new ActorProcessor[I, O](impl) - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) - p - } -} - -/** - * INTERNAL API - */ -@InternalApi private[pekko] class ActorProcessor[I, O](impl: ActorRef) - extends ActorPublisher[O](impl) - with Processor[I, O] { - override def onSubscribe(s: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(s) - impl ! OnSubscribe(s) - } - override def onError(t: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(t) - impl ! OnError(t) - } - override def onComplete(): Unit = impl ! OnComplete - override def onNext(elem: I): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(elem) - impl ! OnNext(elem) - } -} +import org.reactivestreams.{ Subscriber, Subscription } /** * INTERNAL API diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala deleted file mode 100644 index d217f7b873a..00000000000 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2018-2022 Lightbend Inc. - */ - -package org.apache.pekko.stream.impl - -import org.apache.pekko -import pekko.actor.Actor -import pekko.actor.ActorRef -import pekko.actor.Deploy -import pekko.actor.Props -import pekko.annotation.InternalApi -import pekko.stream.ActorAttributes.StreamSubscriptionTimeout -import pekko.stream.Attributes -import pekko.stream.StreamSubscriptionTimeoutTerminationMode -import pekko.util.OptionVal - -import org.reactivestreams.Subscriber - -/** - * INTERNAL API - */ -@InternalApi private[pekko] abstract class FanoutOutputs( - val maxBufferSize: Int, - val initialBufferSize: Int, - self: ActorRef, - val pump: Pump) - extends DefaultOutputTransferStates - with SubscriberManagement[Any] { - - private var _subscribed = false - def subscribed: Boolean = _subscribed - - override type S = ActorSubscriptionWithCursor[_ >: Any] - override def createSubscription(subscriber: Subscriber[_ >: Any]): S = { - _subscribed = true - new ActorSubscriptionWithCursor(self, subscriber) - } - - protected var exposedPublisher: ActorPublisher[Any] = _ - - private var downstreamBufferSpace: Long = 0L - private var downstreamCompleted = false - override def demandAvailable = downstreamBufferSpace > 0 - override def demandCount: Long = downstreamBufferSpace - - override val subreceive = new SubReceive(waitingExposedPublisher) - - def enqueueOutputElement(elem: Any): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(elem) - downstreamBufferSpace -= 1 - pushToDownstream(elem) - } - - override def complete(): Unit = - if (!downstreamCompleted) { - downstreamCompleted = true - completeDownstream() - } - - override def cancel(): Unit = complete() - - override def error(e: Throwable): Unit = { - if (!downstreamCompleted) { - downstreamCompleted = true - abortDownstream(e) - if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) - } - } - - def isClosed: Boolean = downstreamCompleted - - def afterShutdown(): Unit - - override protected def requestFromUpstream(elements: Long): Unit = downstreamBufferSpace += elements - - private def subscribePending(): Unit = - exposedPublisher.takePendingSubscribers().foreach(registerSubscriber) - - override protected def shutdown(completed: Boolean): Unit = { - if (exposedPublisher ne null) { - if (completed) exposedPublisher.shutdown(None) - else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason) - } - afterShutdown() - } - - override protected def cancelUpstream(): Unit = { - downstreamCompleted = true - } - - protected def waitingExposedPublisher: Actor.Receive = { - case ExposedPublisher(publisher) => - exposedPublisher = publisher - subreceive.become(downstreamRunning) - case other => - throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") - } - - protected def downstreamRunning: Actor.Receive = { - case SubscribePending => - subscribePending() - case RequestMore(subscription, elements) => - moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], elements) - pump.pump() - case Cancel(subscription) => - unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]]) - pump.pump() - } - -} - -/** - * INTERNAL API - */ -@InternalApi private[pekko] object FanoutProcessorImpl { - def props(attributes: Attributes): Props = - Props(new FanoutProcessorImpl(attributes)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -@InternalApi private[pekko] class FanoutProcessorImpl(attributes: Attributes) extends ActorProcessorImpl(attributes) { - - val StreamSubscriptionTimeout(timeout, timeoutMode) = attributes.mandatoryAttribute[StreamSubscriptionTimeout] - val timeoutTimer = if (timeoutMode != StreamSubscriptionTimeoutTerminationMode.noop) { - import context.dispatcher - OptionVal.Some(context.system.scheduler.scheduleOnce(timeout, self, ActorProcessorImpl.SubscriptionTimeout)) - } else OptionVal.None - - override val primaryOutputs: FanoutOutputs = { - val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer] - new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) { - override def afterShutdown(): Unit = afterFlush() - } - } - - val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () => - primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) - } - - override def pumpFinished(): Unit = { - primaryInputs.cancel() - primaryOutputs.complete() - } - - override def postStop(): Unit = { - super.postStop() - timeoutTimer match { - case OptionVal.Some(timer) => timer.cancel() - case _ => - } - } - - def afterFlush(): Unit = context.stop(self) - - initialPhase(1, running) - - def subTimeoutHandling: Receive = { - case ActorProcessorImpl.SubscriptionTimeout => - import StreamSubscriptionTimeoutTerminationMode._ - if (!primaryOutputs.subscribed) { - timeoutMode match { - case CancelTermination => - primaryInputs.cancel() - context.stop(self) - case WarnTermination => - log.warning("Subscription timeout for {}", this) - case NoopTermination => // won't happen - } - } - } -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala index 380b27e2dbd..67a282e1ed6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala @@ -108,24 +108,6 @@ import org.reactivestreams.Subscriber new PublisherSink[In](attr, amendShape(attr)) } -/** - * INTERNAL API - */ -@InternalApi private[pekko] final class FanoutPublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) - extends SinkModule[In, Publisher[In]](shape) { - - override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { - val impl = context.materializer.actorOf(context, FanoutProcessorImpl.props(context.effectiveAttributes)) - val fanoutProcessor = new ActorProcessor[In, In](impl) - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]]) - (fanoutProcessor, fanoutProcessor) - } - - override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] = - new FanoutPublisherSink[In](attr, amendShape(attr)) -} - /** * INTERNAL API * Attaches a subscriber to this stream. diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FanoutPublisherSink.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FanoutPublisherSink.scala new file mode 100644 index 00000000000..dcbd5bc0f5f --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FanoutPublisherSink.scala @@ -0,0 +1,162 @@ +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko.NotUsed +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.dispatch.ExecutionContexts +import org.apache.pekko.stream.ActorAttributes.StreamSubscriptionTimeout +import org.apache.pekko.stream.StreamSubscriptionTimeoutTerminationMode.{ + CancelTermination, + NoopTermination, + WarnTermination +} +import org.apache.pekko.stream.impl.CancelledSubscription +import org.apache.pekko.stream.impl.Stages.DefaultAttributes +import org.apache.pekko.stream.scaladsl.{ BroadcastHub, Keep, Sink, Source } +import org.apache.pekko.stream.{ Attributes, Inlet, OverflowStrategy, SinkShape } +import org.apache.pekko.stream.stage.{ + AsyncCallback, + GraphStageLogic, + GraphStageWithMaterializedValue, + InHandler, + OutHandler, + StageLogging, + TimerGraphStageLogic +} +import org.reactivestreams.{ Publisher, Subscriber } + +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import scala.util.{ Failure, Success } + +@InternalApi +private[pekko] object FanoutPublisherSink { + private trait FanoutPublisherSinkState + private case class Open[T](callback: AsyncCallback[Subscriber[_ >: T]]) extends FanoutPublisherSinkState + private case class Closed(failure: Option[Throwable]) extends FanoutPublisherSinkState + private object TimerKey +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] final class FanoutPublisherSink[In] + extends GraphStageWithMaterializedValue[SinkShape[In], Publisher[In]] { + private val in = Inlet[In]("FanoutPublisherSink.in") + override val shape: SinkShape[In] = SinkShape.of(in) + override protected def initialAttributes: Attributes = DefaultAttributes.fanoutPublisherSink + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Publisher[In]) = { + val logic: GraphStageLogic with InHandler with Publisher[In] = + new TimerGraphStageLogic(shape) // for subscribe timeout + with StageLogging + with InHandler + with Publisher[In] { + + import FanoutPublisherSink._ + private[this] val stateRef = new AtomicReference[FanoutPublisherSinkState]( + Open(getAsyncCallback(onSubscribe))) + + private val StreamSubscriptionTimeout(timeout, timeoutMode) = + inheritedAttributes.mandatoryAttribute[StreamSubscriptionTimeout] + private val bufferSize = inheritedAttributes.mandatoryAttribute[Attributes.InputBuffer].max + private val subOutlet = new SubSourceOutlet[In]("FanoutPublisherSink.subSink") + private var publisherSource: Source[In, NotUsed] = _ + + override def onPush(): Unit = { + subOutlet.push(grab(in)) + } + + override def onUpstreamFinish(): Unit = { + stateRef.getAndSet(Closed(None)) + subOutlet.complete() + } + override def onUpstreamFailure(ex: Throwable): Unit = { + stateRef.getAndSet(Closed(Some(ex))) + subOutlet.fail(ex) + } + + subOutlet.setHandler(new OutHandler { + override def onPull(): Unit = if (isClosed(in)) completeStage() else pull(in) + override def onDownstreamFinish(cause: Throwable): Unit = + if (cause != null) { + failStage(cause) + } else { + completeStage() + } + }) + + setHandler(in, this) + + override def preStart(): Unit = { + publisherSource = interpreter.subFusingMaterializer.materialize( + Source.fromGraph(subOutlet.source).toMat( + BroadcastHub.sink[In](1, bufferSize))(Keep.right)) + timeoutMode match { + case NoopTermination => // do nothing + case _ => + scheduleOnce(TimerKey, timeout) + } + } + + override protected def onTimer(timerKey: Any): Unit = { + // no subscriber connected, cancel + timeoutMode match { + case CancelTermination => + completeStage() + case WarnTermination => + log.warning( + s"No subscriber has been attached to FanoutPublisherSink after $timeout," + + "Please cancelling the source stream to avoid memory leaks.") + case _ => // do nothing + } + } + + private def onSubscribe(subscriber: Subscriber[_ >: In]): Unit = { + cancelTimer(TimerKey) + interpreter.subFusingMaterializer.materialize( + publisherSource + .buffer(1, OverflowStrategy.backpressure) // needed to get the complete signal from the hub. + .to(Sink.fromSubscriber(subscriber))) + } + + override def subscribe(subscriber: Subscriber[_ >: In]): Unit = { + import org.apache.pekko.stream.impl.ReactiveStreamsCompliance._ + requireNonNullSubscriber(subscriber) + stateRef.get() match { + case Closed(Some(ex)) => + tryOnSubscribe(subscriber, CancelledSubscription) + tryOnError(subscriber, ex) + + case Closed(None) => + tryOnSubscribe(subscriber, CancelledSubscription) + tryOnComplete(subscriber) + + case Open(callback: AsyncCallback[Subscriber[_]]) => + callback + .invokeWithFeedback(subscriber.asInstanceOf[Subscriber[Any]]) + .onComplete { + case Failure(exception) => subscriber.onError(exception) + case Success(_) => // invoked, waiting materialization + }(ExecutionContexts.parasitic) + case _ => throw new IllegalArgumentException("Should not happen") + } + } + + override def postStop(): Unit = { + cancelTimer(TimerKey) + @tailrec + def tryClose(): Unit = { + stateRef.get() match { + case Closed(_) => // do nothing + case open => + if (!stateRef.compareAndSet(open, Closed(None))) { + tryClose() + } + } + } + tryClose() + } + } + + (logic, logic) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 49a52e418d8..8cd5b442174 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -18,7 +18,6 @@ import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } - import org.apache.pekko import pekko.{ util, Done, NotUsed } import pekko.actor.{ ActorRef, Status } @@ -27,10 +26,9 @@ import pekko.dispatch.ExecutionContexts import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages +import pekko.stream.impl.fusing.{ FanoutPublisherSink, GraphStages } import pekko.stream.stage._ import pekko.util.ccompat._ - import org.reactivestreams.{ Publisher, Subscriber } /** @@ -300,7 +298,7 @@ object Sink { */ def asPublisher[T](fanout: Boolean): Sink[T, Publisher[T]] = fromGraph( - if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) + if (fanout) new FanoutPublisherSink[T]() else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) /**