Skip to content

Commit

Permalink
chore: use Hub
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 11, 2025
1 parent 9844a1b commit 387c56b
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,7 @@ import pekko.stream.ActorAttributes
import pekko.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, OnSubscribe }
import pekko.util.unused

import org.reactivestreams.{ Processor, Subscriber, Subscription }

/**
* INTERNAL API
*/
@InternalApi private[pekko] object ActorProcessor {

def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
val p = new ActorProcessor[I, O](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
p
}
}

/**
* INTERNAL API
*/
@InternalApi private[pekko] class ActorProcessor[I, O](impl: ActorRef)
extends ActorPublisher[O](impl)
with Processor[I, O] {
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
impl ! OnSubscribe(s)
}
override def onError(t: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(t)
impl ! OnError(t)
}
override def onComplete(): Unit = impl ! OnComplete
override def onNext(elem: I): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
impl ! OnNext(elem)
}
}
import org.reactivestreams.{ Subscriber, Subscription }

/**
* INTERNAL API
Expand Down

This file was deleted.

18 changes: 0 additions & 18 deletions stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,6 @@ import org.reactivestreams.Subscriber
new PublisherSink[In](attr, amendShape(attr))
}

/**
* INTERNAL API
*/
@InternalApi private[pekko] final class FanoutPublisherSink[In](val attributes: Attributes, shape: SinkShape[In])
extends SinkModule[In, Publisher[In]](shape) {

override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
val impl = context.materializer.actorOf(context, FanoutProcessorImpl.props(context.effectiveAttributes))
val fanoutProcessor = new ActorProcessor[In, In](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
(fanoutProcessor, fanoutProcessor)
}

override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attr, amendShape(attr))
}

/**
* INTERNAL API
* Attaches a subscriber to this stream.
Expand Down
Loading

0 comments on commit 387c56b

Please sign in to comment.